Skip to main content

gcloud_spanner/
client.rs

1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::metrics::{MetricsConfig, MetricsRecorder};
16use crate::retry::TransactionRetrySetting;
17use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
18use crate::statement::Statement;
19use crate::transaction::{CallOptions, QueryOptions};
20use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
21use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
22use crate::value::TimestampBound;
23
24#[derive(Clone, Default)]
25pub struct PartitionedUpdateOption {
26    pub begin_options: CallOptions,
27    pub query_options: Option<QueryOptions>,
28    /// The transaction tag to use for partitioned update operations.
29    pub transaction_tag: Option<String>,
30}
31
32#[derive(Clone)]
33pub struct ReadOnlyTransactionOption {
34    pub timestamp_bound: TimestampBound,
35    pub call_options: CallOptions,
36}
37
38impl Default for ReadOnlyTransactionOption {
39    fn default() -> Self {
40        ReadOnlyTransactionOption {
41            timestamp_bound: TimestampBound::strong_read(),
42            call_options: CallOptions::default(),
43        }
44    }
45}
46
47#[derive(Clone, Default)]
48pub struct ReadWriteTransactionOption {
49    pub begin_options: CallOptions,
50    pub commit_options: CommitOptions,
51    /// The transaction tag to use for this read/write transaction.
52    pub transaction_tag: Option<String>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ChannelConfig {
57    /// num_channels is the number of gRPC channels.
58    pub num_channels: usize,
59    pub connect_timeout: Duration,
60    pub timeout: Duration,
61}
62
63impl Default for ChannelConfig {
64    fn default() -> Self {
65        ChannelConfig {
66            num_channels: 4,
67            connect_timeout: Duration::from_secs(30),
68            timeout: Duration::from_secs(30),
69        }
70    }
71}
72
73/// ClientConfig has configurations for the client.
74#[derive(Debug)]
75pub struct ClientConfig {
76    /// SessionPoolConfig is the configuration for session pool.
77    pub session_config: SessionConfig,
78    /// ChannelConfig is the configuration for gRPC connection.
79    pub channel_config: ChannelConfig,
80    /// Overriding service endpoint
81    pub endpoint: String,
82    /// Runtime project
83    pub environment: Environment,
84    /// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region.
85    pub disable_route_to_leader: bool,
86    /// Metrics configuration for emitting OpenTelemetry signals.
87    pub metrics: MetricsConfig,
88}
89
90impl Default for ClientConfig {
91    fn default() -> Self {
92        let mut config = ClientConfig {
93            channel_config: Default::default(),
94            session_config: Default::default(),
95            endpoint: SPANNER.to_string(),
96            environment: match var("SPANNER_EMULATOR_HOST").ok() {
97                Some(v) => Environment::Emulator(v),
98                None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
99            },
100            disable_route_to_leader: false,
101            metrics: MetricsConfig::default(),
102        };
103        config.session_config.min_opened = config.channel_config.num_channels * 4;
104        config.session_config.max_opened = config.channel_config.num_channels * 100;
105        config
106    }
107}
108
109#[cfg(feature = "auth")]
110pub use google_cloud_auth;
111use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
112
113#[cfg(feature = "auth")]
114impl ClientConfig {
115    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
116        if let Environment::GoogleCloud(_) = self.environment {
117            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
118            self.environment = Environment::GoogleCloud(Box::new(ts))
119        }
120        Ok(self)
121    }
122
123    pub async fn with_credentials(
124        mut self,
125        credentials: google_cloud_auth::credentials::CredentialsFile,
126    ) -> Result<Self, google_cloud_auth::error::Error> {
127        if let Environment::GoogleCloud(_) = self.environment {
128            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
129                Self::auth_config(),
130                Box::new(credentials),
131            )
132            .await?;
133            self.environment = Environment::GoogleCloud(Box::new(ts))
134        }
135        Ok(self)
136    }
137
138    fn auth_config() -> google_cloud_auth::project::Config<'static> {
139        google_cloud_auth::project::Config::default()
140            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
141            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
142    }
143}
144
145#[derive(thiserror::Error, Debug)]
146pub enum Error {
147    #[error(transparent)]
148    GRPC(#[from] Status),
149
150    #[error(transparent)]
151    InvalidSession(#[from] SessionError),
152
153    #[error(transparent)]
154    ParseError(#[from] crate::row::Error),
155
156    #[error(transparent)]
157    Connection(#[from] google_cloud_gax::conn::Error),
158
159    #[error("invalid config: {0}")]
160    InvalidConfig(String),
161}
162
163impl TryAs<Status> for Error {
164    fn try_as(&self) -> Option<&Status> {
165        match self {
166            Error::GRPC(e) => Some(e),
167            _ => None,
168        }
169    }
170}
171
172/// Client is a client for reading and writing data to a Cloud Spanner database.
173/// A client is safe to use concurrently, except for its Close method.
174#[derive(Clone)]
175pub struct Client {
176    sessions: Arc<SessionManager>,
177    disable_route_to_leader: bool,
178}
179
180impl Client {
181    /// new creates a client to a database. A valid database name has
182    /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
183    pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
184        if config.session_config.max_opened > config.channel_config.num_channels * 100 {
185            return Err(Error::InvalidConfig(format!(
186                "max session size is {} because max session size is 100 per gRPC connection",
187                config.channel_config.num_channels * 100
188            )));
189        }
190
191        let database: String = database.into();
192        let metrics = Arc::new(
193            MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?,
194        );
195
196        let pool_size = config.channel_config.num_channels;
197        let options = ConnectionOptions {
198            timeout: Some(config.channel_config.timeout),
199            connect_timeout: Some(config.channel_config.connect_timeout),
200        };
201        let conn_pool =
202            ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
203        let session_manager = SessionManager::new(
204            database,
205            conn_pool,
206            config.session_config,
207            config.disable_route_to_leader,
208            metrics.clone(),
209        )
210        .await?;
211
212        Ok(Client {
213            sessions: session_manager,
214            disable_route_to_leader: config.disable_route_to_leader,
215        })
216    }
217
218    /// Close closes all the sessions gracefully.
219    /// This method can be called only once.
220    pub async fn close(self) {
221        self.sessions.close().await;
222    }
223
224    /// single provides a read-only snapshot transaction optimized for the case
225    /// where only a single read or query is needed.  This is more efficient than
226    /// using read_only_transaction for a single read or query.
227    /// ```
228    /// use google_cloud_spanner::key::Key;
229    /// use google_cloud_spanner::statement::ToKind;
230    /// use google_cloud_spanner::client::Client;
231    ///
232    /// #[tokio::main]
233    /// async fn run(client: Client) {
234    ///     let mut tx = client.single().await.unwrap();
235    ///     let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
236    ///         Key::new(&"pk1"),
237    ///         Key::new(&"pk2")
238    ///     ]).await.unwrap();
239    /// }
240    /// ```
241    pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
242        self.single_with_timestamp_bound(TimestampBound::strong_read()).await
243    }
244
245    /// single provides a read-only snapshot transaction optimized for the case
246    /// where only a single read or query is needed.  This is more efficient than
247    /// using read_only_transaction for a single read or query.
248    pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
249        let session = self.get_session().await?;
250        let result = ReadOnlyTransaction::single(session, tb).await?;
251        Ok(result)
252    }
253
254    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
255    /// multiple reads from the database.
256    ///
257    /// ```ignore
258    /// use google_cloud_spanner::client::{Client, Error};
259    /// use google_cloud_spanner::statement::Statement;
260    /// use google_cloud_spanner::key::Key;
261    ///
262    /// async fn run(client: Client) -> Result<(), Error>{
263    ///     let mut tx = client.read_only_transaction().await?;
264    ///
265    ///     let mut stmt = Statement::new("SELECT * , \
266    ///             ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
267    ///             ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter  \
268    ///             FROM User \
269    ///             WHERE UserId = @Param1");
270    ///
271    ///     stmt.add_param("Param1", user_id);
272    ///     let mut reader = tx.query(stmt).await?;
273    ///     let mut data = vec![];
274    ///     while let Some(row) = reader.next().await? {
275    ///         let user_id= row.column_by_name::<String>("UserId")?;
276    ///         let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
277    ///         let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
278    ///         data.push(user_id);
279    ///     }
280    ///
281    ///     let mut reader2 = tx.read("User", &["UserId"], vec![
282    ///         Key::new(&"user-1"),
283    ///         Key::new(&"user-2")
284    ///     ]).await?;
285    ///
286    ///     Ok(())
287    /// }
288    pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
289        self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
290            .await
291    }
292
293    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
294    /// multiple reads from the database.
295    pub async fn read_only_transaction_with_option(
296        &self,
297        options: ReadOnlyTransactionOption,
298    ) -> Result<ReadOnlyTransaction, Error> {
299        let session = self.get_session().await?;
300        let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
301        Ok(result)
302    }
303
304    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
305    /// for partitioned reads or queries from a snapshot of the database. This is
306    /// useful in batch processing pipelines where one wants to divide the work of
307    /// reading from the database across multiple machines.
308    pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
309        self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
310            .await
311    }
312
313    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
314    /// for partitioned reads or queries from a snapshot of the database. This is
315    /// useful in batch processing pipelines where one wants to divide the work of
316    /// reading from the database across multiple machines.
317    pub async fn batch_read_only_transaction_with_option(
318        &self,
319        options: ReadOnlyTransactionOption,
320    ) -> Result<BatchReadOnlyTransaction, Error> {
321        let session = self.get_session().await?;
322        let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
323        Ok(result)
324    }
325
326    /// partitioned_update executes a DML statement in parallel across the database,
327    /// using separate, internal transactions that commit independently. The DML
328    /// statement must be fully partitionable: it must be expressible as the union
329    /// of many statements each of which accesses only a single row of the table. The
330    /// statement should also be idempotent, because it may be applied more than once.
331    ///
332    /// PartitionedUpdate returns an estimated count of the number of rows affected.
333    /// The actual number of affected rows may be greater than the estimate.
334    pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
335        self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
336            .await
337    }
338
339    /// partitioned_update executes a DML statement in parallel across the database,
340    /// using separate, internal transactions that commit independently. The DML
341    /// statement must be fully partitionable: it must be expressible as the union
342    /// of many statements each of which accesses only a single row of the table. The
343    /// statement should also be idempotent, because it may be applied more than once.
344    ///
345    /// PartitionedUpdate returns an estimated count of the number of rows affected.
346    /// The actual number of affected rows may be greater than the estimate.
347    pub async fn partitioned_update_with_option(
348        &self,
349        stmt: Statement,
350        options: PartitionedUpdateOption,
351    ) -> Result<i64, Error> {
352        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
353        let session = Some(self.get_session().await?);
354
355        // reuse session
356        invoke_fn(
357            Some(ro),
358            |session| async {
359                let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
360                    session.unwrap(),
361                    options.begin_options.clone(),
362                    options.transaction_tag.clone(),
363                    self.disable_route_to_leader,
364                )
365                .await
366                {
367                    Ok(tx) => tx,
368                    Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
369                };
370                tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
371                    .await
372                    .map_err(|e| (Error::GRPC(e), tx.take_session()))
373            },
374            session,
375        )
376        .await
377    }
378
379    /// apply_at_least_once may attempt to apply mutations more than once; if
380    /// the mutations are not idempotent, this may lead to a failure being reported
381    /// when the mutation was applied more than once. For example, an insert may
382    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
383    /// called. For this reason, most users of the library will prefer not to use
384    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
385    /// apply's default replay protection may require an additional RPC.  So this
386    /// method may be appropriate for latency sensitive and/or high throughput blind
387    /// writing.
388    pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
389        self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
390    }
391
392    /// apply_at_least_once may attempt to apply mutations more than once; if
393    /// the mutations are not idempotent, this may lead to a failure being reported
394    /// when the mutation was applied more than once. For example, an insert may
395    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
396    /// called. For this reason, most users of the library will prefer not to use
397    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
398    /// apply's default replay protection may require an additional RPC.  So this
399    /// method may be appropriate for latency sensitive and/or high throughput blind
400    /// writing.
401    pub async fn apply_at_least_once_with_option(
402        &self,
403        ms: Vec<Mutation>,
404        options: CommitOptions,
405    ) -> Result<Option<CommitResult>, Error> {
406        let ro = TransactionRetrySetting::default();
407        let mut session = self.get_session().await?;
408
409        invoke_fn(
410            Some(ro),
411            |session| async {
412                let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
413                    exclude_txn_from_change_streams: false,
414                    mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
415                    isolation_level: IsolationLevel::Unspecified as i32,
416                });
417                match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await {
418                    Ok(s) => Ok(Some(s.into())),
419                    Err(e) => Err((Error::GRPC(e), session)),
420                }
421            },
422            &mut session,
423        )
424        .await
425    }
426
427    /// Apply applies a list of mutations atomically to the database.
428    /// ```
429    /// use google_cloud_spanner::mutation::insert;
430    /// use google_cloud_spanner::mutation::delete;
431    /// use google_cloud_spanner::key::all_keys;
432    /// use google_cloud_spanner::statement::ToKind;
433    /// use google_cloud_spanner::client::{Client, Error};
434    /// use google_cloud_spanner::value::CommitTimestamp;
435    ///
436    /// async fn run(client: Client) -> Result<(), Error>{
437    ///     let m1 = delete("Guild", all_keys());
438    ///     let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
439    ///     let commit_timestamp = client.apply(vec![m1,m2]).await?;
440    ///     Ok(())
441    /// }
442    /// ```
443    pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
444        self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
445    }
446
447    /// Apply applies a list of mutations atomically to the database.
448    pub async fn apply_with_option(
449        &self,
450        ms: Vec<Mutation>,
451        options: ReadWriteTransactionOption,
452    ) -> Result<CommitResult, Error> {
453        let result: Result<(CommitResult, ()), Error> = self
454            .read_write_transaction_sync_with_option(
455                |tx| {
456                    tx.buffer_write(ms.to_vec());
457                    Ok(())
458                },
459                options,
460            )
461            .await;
462        Ok(result?.0)
463    }
464
465    /// ReadWriteTransaction executes a read-write transaction, with retries as
466    /// necessary.
467    ///
468    /// The function f will be called one or more times. It must not maintain
469    /// any state between calls.
470    ///
471    /// If the transaction cannot be committed or if f returns an ABORTED error,
472    /// ReadWriteTransaction will call f again. It will continue to call f until the
473    /// transaction can be committed or the Context times out or is cancelled.  If f
474    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
475    /// transaction and return the error.
476    ///
477    /// To limit the number of retries, set a deadline on the Context rather than
478    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
479    /// retry as needed until that deadline is met.
480    ///
481    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
482    /// more details.
483    /// ```
484    /// use google_cloud_spanner::mutation::update;
485    /// use google_cloud_spanner::key::{Key, all_keys};
486    /// use google_cloud_spanner::value::Timestamp;
487    /// use google_cloud_spanner::client::Error;
488    /// use google_cloud_spanner::client::Client;
489    ///
490    /// #[tokio::main]
491    /// async fn run(client: Client) ->  Result<(Option<Timestamp>,()), Error>{
492    ///     client.read_write_transaction(|tx| {
493    ///         Box::pin(async move {
494    ///             // The transaction function will be called again if the error code
495    ///             // of this error is Aborted. The backend may automatically abort
496    ///             // any read/write transaction if it detects a deadlock or other problems.
497    ///             let key = all_keys();
498    ///             let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
499    ///             let mut ms = vec![];
500    ///             while let Some(row) = reader.next().await? {
501    ///                 let user_id = row.column_by_name::<String>("UserId")?;
502    ///                 let item_id = row.column_by_name::<i64>("ItemId")?;
503    ///                 let quantity = row.column_by_name::<i64>("Quantity")? + 1;
504    ///                 let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
505    ///                 ms.push(m);
506    ///             }
507    ///             // The buffered mutation will be committed.  If the commit
508    ///             // fails with an Aborted error, this function will be called again
509    ///             tx.buffer_write(ms);
510    ///             Ok(())
511    ///         })
512    ///     }).await
513    /// }
514    pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
515    where
516        E: TryAs<Status> + From<SessionError> + From<Status>,
517        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
518    {
519        self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
520            .await
521    }
522
523    /// ReadWriteTransaction executes a read-write transaction, with retries as
524    /// necessary.
525    ///
526    /// The function f will be called one or more times. It must not maintain
527    /// any state between calls.
528    ///
529    /// If the transaction cannot be committed or if f returns an ABORTED error,
530    /// ReadWriteTransaction will call f again. It will continue to call f until the
531    /// transaction can be committed or the Context times out or is cancelled.  If f
532    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
533    /// transaction and return the error.
534    ///
535    /// To limit the number of retries, set a deadline on the Context rather than
536    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
537    /// retry as needed until that deadline is met.
538    ///
539    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
540    /// more details.
541    pub async fn read_write_transaction_with_option<'a, T, E, F>(
542        &'a self,
543        f: F,
544        options: ReadWriteTransactionOption,
545    ) -> Result<(CommitResult, T), E>
546    where
547        E: TryAs<Status> + From<SessionError> + From<Status>,
548        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
549    {
550        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
551
552        let ro = TransactionRetrySetting::default();
553        let session = Some(self.get_session().await?);
554        // must reuse session
555        invoke_fn(
556            Some(ro),
557            |session| async {
558                let mut tx = self
559                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
560                    .await?;
561                let result = f(&mut tx).await;
562                tx.finish(result, Some(co.clone())).await
563            },
564            session,
565        )
566        .await
567    }
568
569    /// begin_read_write_transaction creates new ReadWriteTransaction.
570    /// ```
571    /// use google_cloud_spanner::mutation::update;
572    /// use google_cloud_spanner::key::{Key, all_keys};
573    /// use google_cloud_spanner::value::Timestamp;
574    /// use google_cloud_spanner::client::Error;
575    /// use google_cloud_spanner::client::Client;
576    /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
577    /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
578    /// use google_cloud_spanner::retry::TransactionRetry;
579    ///
580    /// async fn run(client: Client) -> Result<(), Error>{
581    ///     let retry = &mut TransactionRetry::new();
582    ///     loop {
583    ///         let tx = &mut client.begin_read_write_transaction().await?;
584    ///
585    ///         let result = run_in_transaction(tx).await;
586    ///
587    ///         // try to commit or rollback transaction.
588    ///         match tx.end(result, None).await {
589    ///             Ok((_commit_timestamp, success)) => return Ok(success),
590    ///             Err(err) => retry.next(err).await? // check retry
591    ///         }
592    ///     }
593    /// }
594    ///
595    /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
596    ///     let key = all_keys();
597    ///     let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
598    ///     let mut ms = vec![];
599    ///     while let Some(row) = reader.next().await? {
600    ///         let user_id = row.column_by_name::<String>("UserId")?;
601    ///         let item_id = row.column_by_name::<i64>("ItemId")?;
602    ///         let quantity = row.column_by_name::<i64>("Quantity")? + 1;
603    ///         let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
604    ///         ms.push(m);
605    ///     }
606    ///     tx.buffer_write(ms);
607    ///     Ok(())
608    /// }
609    /// ```
610    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
611        let session = self.get_session().await?;
612        ReadWriteTransaction::begin(
613            session,
614            ReadWriteTransactionOption::default().begin_options,
615            ReadWriteTransactionOption::default().transaction_tag,
616            self.disable_route_to_leader,
617        )
618        .await
619        .map_err(|e| e.status.into())
620    }
621
622    /// Get open session count.
623    pub fn session_count(&self) -> usize {
624        self.sessions.num_opened()
625    }
626
627    async fn read_write_transaction_sync_with_option<T, E>(
628        &self,
629        f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
630        options: ReadWriteTransactionOption,
631    ) -> Result<(CommitResult, T), E>
632    where
633        E: TryAs<Status> + From<SessionError> + From<Status>,
634    {
635        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
636
637        let ro = TransactionRetrySetting::default();
638        let session = Some(self.get_session().await?);
639
640        // reuse session
641        invoke_fn(
642            Some(ro),
643            |session| async {
644                let mut tx = self
645                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
646                    .await?;
647                let result = f(&mut tx);
648                tx.finish(result, Some(co.clone())).await
649            },
650            session,
651        )
652        .await
653    }
654
655    async fn create_read_write_transaction<E>(
656        &self,
657        session: Option<ManagedSession>,
658        bo: CallOptions,
659        transaction_tag: Option<String>,
660    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
661    where
662        E: TryAs<Status> + From<SessionError> + From<Status>,
663    {
664        ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag, self.disable_route_to_leader)
665            .await
666            .map_err(|e| (E::from(e.status), Some(e.session)))
667    }
668
669    async fn get_session(&self) -> Result<ManagedSession, SessionError> {
670        self.sessions.get().await
671    }
672
673    fn split_read_write_transaction_option(
674        options: ReadWriteTransactionOption,
675    ) -> (CallOptions, CommitOptions, Option<String>) {
676        (options.begin_options, options.commit_options, options.transaction_tag)
677    }
678}