Skip to main content

gcloud_spanner/
client.rs

1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::metrics::{MetricsConfig, MetricsRecorder};
16use crate::retry::TransactionRetrySetting;
17use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
18use crate::statement::Statement;
19use crate::transaction::{CallOptions, QueryOptions};
20use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
21use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
22use crate::value::TimestampBound;
23
24#[derive(Clone, Default)]
25pub struct PartitionedUpdateOption {
26    pub begin_options: CallOptions,
27    pub query_options: Option<QueryOptions>,
28    /// The transaction tag to use for partitioned update operations.
29    pub transaction_tag: Option<String>,
30}
31
32#[derive(Clone)]
33pub struct ReadOnlyTransactionOption {
34    pub timestamp_bound: TimestampBound,
35    pub call_options: CallOptions,
36}
37
38impl Default for ReadOnlyTransactionOption {
39    fn default() -> Self {
40        ReadOnlyTransactionOption {
41            timestamp_bound: TimestampBound::strong_read(),
42            call_options: CallOptions::default(),
43        }
44    }
45}
46
47#[derive(Clone, Default)]
48pub struct ReadWriteTransactionOption {
49    pub begin_options: CallOptions,
50    pub commit_options: CommitOptions,
51    /// The transaction tag to use for this read/write transaction.
52    pub transaction_tag: Option<String>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ChannelConfig {
57    /// num_channels is the number of gRPC channels.
58    pub num_channels: usize,
59    pub connect_timeout: Duration,
60    pub timeout: Duration,
61    pub http2_keep_alive_interval: Option<Duration>,
62    pub keep_alive_timeout: Option<Duration>,
63    pub keep_alive_while_idle: Option<bool>,
64}
65
66impl Default for ChannelConfig {
67    fn default() -> Self {
68        ChannelConfig {
69            num_channels: 4,
70            connect_timeout: Duration::from_secs(30),
71            timeout: Duration::from_secs(30),
72            http2_keep_alive_interval: None,
73            keep_alive_timeout: None,
74            keep_alive_while_idle: None,
75        }
76    }
77}
78
79/// ClientConfig has configurations for the client.
80#[derive(Debug)]
81pub struct ClientConfig {
82    /// SessionPoolConfig is the configuration for session pool.
83    pub session_config: SessionConfig,
84    /// ChannelConfig is the configuration for gRPC connection.
85    pub channel_config: ChannelConfig,
86    /// Overriding service endpoint
87    pub endpoint: String,
88    /// Runtime project
89    pub environment: Environment,
90    /// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region.
91    pub disable_route_to_leader: bool,
92    /// Metrics configuration for emitting OpenTelemetry signals.
93    pub metrics: MetricsConfig,
94}
95
96impl Default for ClientConfig {
97    fn default() -> Self {
98        let mut config = ClientConfig {
99            channel_config: Default::default(),
100            session_config: Default::default(),
101            endpoint: SPANNER.to_string(),
102            environment: match var("SPANNER_EMULATOR_HOST").ok() {
103                Some(v) => Environment::Emulator(v),
104                None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
105            },
106            disable_route_to_leader: false,
107            metrics: MetricsConfig::default(),
108        };
109        config.session_config.min_opened = config.channel_config.num_channels * 4;
110        config.session_config.max_opened = config.channel_config.num_channels * 100;
111        config
112    }
113}
114
115#[cfg(feature = "auth")]
116pub use google_cloud_auth;
117use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
118
119#[cfg(feature = "auth")]
120impl ClientConfig {
121    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
122        if let Environment::GoogleCloud(_) = self.environment {
123            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
124            self.environment = Environment::GoogleCloud(Box::new(ts))
125        }
126        Ok(self)
127    }
128
129    pub async fn with_credentials(
130        mut self,
131        credentials: google_cloud_auth::credentials::CredentialsFile,
132    ) -> Result<Self, google_cloud_auth::error::Error> {
133        if let Environment::GoogleCloud(_) = self.environment {
134            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
135                Self::auth_config(),
136                Box::new(credentials),
137            )
138            .await?;
139            self.environment = Environment::GoogleCloud(Box::new(ts))
140        }
141        Ok(self)
142    }
143
144    fn auth_config() -> google_cloud_auth::project::Config<'static> {
145        google_cloud_auth::project::Config::default()
146            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
147            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
148    }
149}
150
151#[derive(thiserror::Error, Debug)]
152pub enum Error {
153    #[error(transparent)]
154    GRPC(#[from] Status),
155
156    #[error(transparent)]
157    InvalidSession(#[from] SessionError),
158
159    #[error(transparent)]
160    ParseError(#[from] crate::row::Error),
161
162    #[error(transparent)]
163    Connection(#[from] google_cloud_gax::conn::Error),
164
165    #[error("invalid config: {0}")]
166    InvalidConfig(String),
167}
168
169impl TryAs<Status> for Error {
170    fn try_as(&self) -> Option<&Status> {
171        match self {
172            Error::GRPC(e) => Some(e),
173            _ => None,
174        }
175    }
176}
177
178/// Client is a client for reading and writing data to a Cloud Spanner database.
179/// A client is safe to use concurrently, except for its Close method.
180#[derive(Clone)]
181pub struct Client {
182    sessions: Arc<SessionManager>,
183    disable_route_to_leader: bool,
184}
185
186impl Client {
187    /// new creates a client to a database. A valid database name has
188    /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
189    pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
190        if config.session_config.max_opened > config.channel_config.num_channels * 100 {
191            return Err(Error::InvalidConfig(format!(
192                "max session size is {} because max session size is 100 per gRPC connection",
193                config.channel_config.num_channels * 100
194            )));
195        }
196
197        let database: String = database.into();
198        let metrics = Arc::new(
199            MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?,
200        );
201
202        let pool_size = config.channel_config.num_channels;
203        let options = ConnectionOptions {
204            timeout: Some(config.channel_config.timeout),
205            connect_timeout: Some(config.channel_config.connect_timeout),
206            http2_keep_alive_interval: config.channel_config.http2_keep_alive_interval,
207            keep_alive_timeout: config.channel_config.keep_alive_timeout,
208            keep_alive_while_idle: config.channel_config.keep_alive_while_idle,
209        };
210        let conn_pool =
211            ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
212        let session_manager = SessionManager::new(
213            database,
214            conn_pool,
215            config.session_config,
216            config.disable_route_to_leader,
217            metrics.clone(),
218        )
219        .await?;
220
221        Ok(Client {
222            sessions: session_manager,
223            disable_route_to_leader: config.disable_route_to_leader,
224        })
225    }
226
227    /// Close closes all the sessions gracefully.
228    /// This method can be called only once.
229    pub async fn close(self) {
230        self.sessions.close().await;
231    }
232
233    /// single provides a read-only snapshot transaction optimized for the case
234    /// where only a single read or query is needed.  This is more efficient than
235    /// using read_only_transaction for a single read or query.
236    /// ```
237    /// use google_cloud_spanner::key::Key;
238    /// use google_cloud_spanner::statement::ToKind;
239    /// use google_cloud_spanner::client::Client;
240    ///
241    /// #[tokio::main]
242    /// async fn run(client: Client) {
243    ///     let mut tx = client.single().await.unwrap();
244    ///     let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
245    ///         Key::new(&"pk1"),
246    ///         Key::new(&"pk2")
247    ///     ]).await.unwrap();
248    /// }
249    /// ```
250    pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
251        self.single_with_timestamp_bound(TimestampBound::strong_read()).await
252    }
253
254    /// single provides a read-only snapshot transaction optimized for the case
255    /// where only a single read or query is needed.  This is more efficient than
256    /// using read_only_transaction for a single read or query.
257    pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
258        let session = self.get_session().await?;
259        let result = ReadOnlyTransaction::single(session, tb).await?;
260        Ok(result)
261    }
262
263    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
264    /// multiple reads from the database.
265    ///
266    /// ```ignore
267    /// use google_cloud_spanner::client::{Client, Error};
268    /// use google_cloud_spanner::statement::Statement;
269    /// use google_cloud_spanner::key::Key;
270    ///
271    /// async fn run(client: Client) -> Result<(), Error>{
272    ///     let mut tx = client.read_only_transaction().await?;
273    ///
274    ///     let mut stmt = Statement::new("SELECT * , \
275    ///             ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
276    ///             ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter  \
277    ///             FROM User \
278    ///             WHERE UserId = @Param1");
279    ///
280    ///     stmt.add_param("Param1", user_id);
281    ///     let mut reader = tx.query(stmt).await?;
282    ///     let mut data = vec![];
283    ///     while let Some(row) = reader.next().await? {
284    ///         let user_id= row.column_by_name::<String>("UserId")?;
285    ///         let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
286    ///         let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
287    ///         data.push(user_id);
288    ///     }
289    ///
290    ///     let mut reader2 = tx.read("User", &["UserId"], vec![
291    ///         Key::new(&"user-1"),
292    ///         Key::new(&"user-2")
293    ///     ]).await?;
294    ///
295    ///     Ok(())
296    /// }
297    pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
298        self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
299            .await
300    }
301
302    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
303    /// multiple reads from the database.
304    pub async fn read_only_transaction_with_option(
305        &self,
306        options: ReadOnlyTransactionOption,
307    ) -> Result<ReadOnlyTransaction, Error> {
308        let session = self.get_session().await?;
309        let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
310        Ok(result)
311    }
312
313    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
314    /// for partitioned reads or queries from a snapshot of the database. This is
315    /// useful in batch processing pipelines where one wants to divide the work of
316    /// reading from the database across multiple machines.
317    pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
318        self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
319            .await
320    }
321
322    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
323    /// for partitioned reads or queries from a snapshot of the database. This is
324    /// useful in batch processing pipelines where one wants to divide the work of
325    /// reading from the database across multiple machines.
326    pub async fn batch_read_only_transaction_with_option(
327        &self,
328        options: ReadOnlyTransactionOption,
329    ) -> Result<BatchReadOnlyTransaction, Error> {
330        let session = self.get_session().await?;
331        let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
332        Ok(result)
333    }
334
335    /// partitioned_update executes a DML statement in parallel across the database,
336    /// using separate, internal transactions that commit independently. The DML
337    /// statement must be fully partitionable: it must be expressible as the union
338    /// of many statements each of which accesses only a single row of the table. The
339    /// statement should also be idempotent, because it may be applied more than once.
340    ///
341    /// PartitionedUpdate returns an estimated count of the number of rows affected.
342    /// The actual number of affected rows may be greater than the estimate.
343    pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
344        self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
345            .await
346    }
347
348    /// partitioned_update executes a DML statement in parallel across the database,
349    /// using separate, internal transactions that commit independently. The DML
350    /// statement must be fully partitionable: it must be expressible as the union
351    /// of many statements each of which accesses only a single row of the table. The
352    /// statement should also be idempotent, because it may be applied more than once.
353    ///
354    /// PartitionedUpdate returns an estimated count of the number of rows affected.
355    /// The actual number of affected rows may be greater than the estimate.
356    pub async fn partitioned_update_with_option(
357        &self,
358        stmt: Statement,
359        options: PartitionedUpdateOption,
360    ) -> Result<i64, Error> {
361        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
362        let session = Some(self.get_session().await?);
363
364        // reuse session
365        invoke_fn(
366            Some(ro),
367            |session| async {
368                let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
369                    session.unwrap(),
370                    options.begin_options.clone(),
371                    options.transaction_tag.clone(),
372                    self.disable_route_to_leader,
373                )
374                .await
375                {
376                    Ok(tx) => tx,
377                    Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
378                };
379                tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
380                    .await
381                    .map_err(|e| (Error::GRPC(e), tx.take_session()))
382            },
383            session,
384        )
385        .await
386    }
387
388    /// apply_at_least_once may attempt to apply mutations more than once; if
389    /// the mutations are not idempotent, this may lead to a failure being reported
390    /// when the mutation was applied more than once. For example, an insert may
391    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
392    /// called. For this reason, most users of the library will prefer not to use
393    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
394    /// apply's default replay protection may require an additional RPC.  So this
395    /// method may be appropriate for latency sensitive and/or high throughput blind
396    /// writing.
397    pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
398        self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
399    }
400
401    /// apply_at_least_once may attempt to apply mutations more than once; if
402    /// the mutations are not idempotent, this may lead to a failure being reported
403    /// when the mutation was applied more than once. For example, an insert may
404    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
405    /// called. For this reason, most users of the library will prefer not to use
406    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
407    /// apply's default replay protection may require an additional RPC.  So this
408    /// method may be appropriate for latency sensitive and/or high throughput blind
409    /// writing.
410    pub async fn apply_at_least_once_with_option(
411        &self,
412        ms: Vec<Mutation>,
413        options: CommitOptions,
414    ) -> Result<Option<CommitResult>, Error> {
415        let ro = TransactionRetrySetting::default();
416        let mut session = self.get_session().await?;
417
418        invoke_fn(
419            Some(ro),
420            |session| async {
421                let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
422                    exclude_txn_from_change_streams: false,
423                    mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
424                    isolation_level: IsolationLevel::Unspecified as i32,
425                });
426                match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await {
427                    Ok(s) => Ok(Some(s.into())),
428                    Err(e) => Err((Error::GRPC(e), session)),
429                }
430            },
431            &mut session,
432        )
433        .await
434    }
435
436    /// Apply applies a list of mutations atomically to the database.
437    /// ```
438    /// use google_cloud_spanner::mutation::insert;
439    /// use google_cloud_spanner::mutation::delete;
440    /// use google_cloud_spanner::key::all_keys;
441    /// use google_cloud_spanner::statement::ToKind;
442    /// use google_cloud_spanner::client::{Client, Error};
443    /// use google_cloud_spanner::value::CommitTimestamp;
444    ///
445    /// async fn run(client: Client) -> Result<(), Error>{
446    ///     let m1 = delete("Guild", all_keys());
447    ///     let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
448    ///     let commit_timestamp = client.apply(vec![m1,m2]).await?;
449    ///     Ok(())
450    /// }
451    /// ```
452    pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
453        self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
454    }
455
456    /// Apply applies a list of mutations atomically to the database.
457    pub async fn apply_with_option(
458        &self,
459        ms: Vec<Mutation>,
460        options: ReadWriteTransactionOption,
461    ) -> Result<CommitResult, Error> {
462        let result: Result<(CommitResult, ()), Error> = self
463            .read_write_transaction_sync_with_option(
464                |tx| {
465                    tx.buffer_write(ms.to_vec());
466                    Ok(())
467                },
468                options,
469            )
470            .await;
471        Ok(result?.0)
472    }
473
474    /// ReadWriteTransaction executes a read-write transaction, with retries as
475    /// necessary.
476    ///
477    /// The function f will be called one or more times. It must not maintain
478    /// any state between calls.
479    ///
480    /// If the transaction cannot be committed or if f returns an ABORTED error,
481    /// ReadWriteTransaction will call f again. It will continue to call f until the
482    /// transaction can be committed or the Context times out or is cancelled.  If f
483    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
484    /// transaction and return the error.
485    ///
486    /// To limit the number of retries, set a deadline on the Context rather than
487    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
488    /// retry as needed until that deadline is met.
489    ///
490    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
491    /// more details.
492    /// ```
493    /// use google_cloud_spanner::mutation::update;
494    /// use google_cloud_spanner::key::{Key, all_keys};
495    /// use google_cloud_spanner::value::Timestamp;
496    /// use google_cloud_spanner::client::Error;
497    /// use google_cloud_spanner::client::Client;
498    ///
499    /// #[tokio::main]
500    /// async fn run(client: Client) ->  Result<(Option<Timestamp>,()), Error>{
501    ///     client.read_write_transaction(|tx| {
502    ///         Box::pin(async move {
503    ///             // The transaction function will be called again if the error code
504    ///             // of this error is Aborted. The backend may automatically abort
505    ///             // any read/write transaction if it detects a deadlock or other problems.
506    ///             let key = all_keys();
507    ///             let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
508    ///             let mut ms = vec![];
509    ///             while let Some(row) = reader.next().await? {
510    ///                 let user_id = row.column_by_name::<String>("UserId")?;
511    ///                 let item_id = row.column_by_name::<i64>("ItemId")?;
512    ///                 let quantity = row.column_by_name::<i64>("Quantity")? + 1;
513    ///                 let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
514    ///                 ms.push(m);
515    ///             }
516    ///             // The buffered mutation will be committed.  If the commit
517    ///             // fails with an Aborted error, this function will be called again
518    ///             tx.buffer_write(ms);
519    ///             Ok(())
520    ///         })
521    ///     }).await
522    /// }
523    pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
524    where
525        E: TryAs<Status> + From<SessionError> + From<Status>,
526        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
527    {
528        self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
529            .await
530    }
531
532    /// ReadWriteTransaction executes a read-write transaction, with retries as
533    /// necessary.
534    ///
535    /// The function f will be called one or more times. It must not maintain
536    /// any state between calls.
537    ///
538    /// If the transaction cannot be committed or if f returns an ABORTED error,
539    /// ReadWriteTransaction will call f again. It will continue to call f until the
540    /// transaction can be committed or the Context times out or is cancelled.  If f
541    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
542    /// transaction and return the error.
543    ///
544    /// To limit the number of retries, set a deadline on the Context rather than
545    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
546    /// retry as needed until that deadline is met.
547    ///
548    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
549    /// more details.
550    pub async fn read_write_transaction_with_option<'a, T, E, F>(
551        &'a self,
552        f: F,
553        options: ReadWriteTransactionOption,
554    ) -> Result<(CommitResult, T), E>
555    where
556        E: TryAs<Status> + From<SessionError> + From<Status>,
557        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
558    {
559        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
560
561        let ro = TransactionRetrySetting::default();
562        let session = Some(self.get_session().await?);
563        // must reuse session
564        invoke_fn(
565            Some(ro),
566            |session| async {
567                let mut tx = self
568                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
569                    .await?;
570                let result = f(&mut tx).await;
571                tx.finish(result, Some(co.clone())).await
572            },
573            session,
574        )
575        .await
576    }
577
578    /// begin_read_write_transaction creates new ReadWriteTransaction.
579    /// ```
580    /// use google_cloud_spanner::mutation::update;
581    /// use google_cloud_spanner::key::{Key, all_keys};
582    /// use google_cloud_spanner::value::Timestamp;
583    /// use google_cloud_spanner::client::Error;
584    /// use google_cloud_spanner::client::Client;
585    /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
586    /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
587    /// use google_cloud_spanner::retry::TransactionRetry;
588    ///
589    /// async fn run(client: Client) -> Result<(), Error>{
590    ///     let retry = &mut TransactionRetry::new();
591    ///     loop {
592    ///         let tx = &mut client.begin_read_write_transaction().await?;
593    ///
594    ///         let result = run_in_transaction(tx).await;
595    ///
596    ///         // try to commit or rollback transaction.
597    ///         match tx.end(result, None).await {
598    ///             Ok((_commit_timestamp, success)) => return Ok(success),
599    ///             Err(err) => retry.next(err).await? // check retry
600    ///         }
601    ///     }
602    /// }
603    ///
604    /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
605    ///     let key = all_keys();
606    ///     let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
607    ///     let mut ms = vec![];
608    ///     while let Some(row) = reader.next().await? {
609    ///         let user_id = row.column_by_name::<String>("UserId")?;
610    ///         let item_id = row.column_by_name::<i64>("ItemId")?;
611    ///         let quantity = row.column_by_name::<i64>("Quantity")? + 1;
612    ///         let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
613    ///         ms.push(m);
614    ///     }
615    ///     tx.buffer_write(ms);
616    ///     Ok(())
617    /// }
618    /// ```
619    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
620        let session = self.get_session().await?;
621        ReadWriteTransaction::begin(
622            session,
623            ReadWriteTransactionOption::default().begin_options,
624            ReadWriteTransactionOption::default().transaction_tag,
625            self.disable_route_to_leader,
626        )
627        .await
628        .map_err(|e| e.status.into())
629    }
630
631    /// Get open session count.
632    pub fn session_count(&self) -> usize {
633        self.sessions.num_opened()
634    }
635
636    async fn read_write_transaction_sync_with_option<T, E>(
637        &self,
638        f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
639        options: ReadWriteTransactionOption,
640    ) -> Result<(CommitResult, T), E>
641    where
642        E: TryAs<Status> + From<SessionError> + From<Status>,
643    {
644        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
645
646        let ro = TransactionRetrySetting::default();
647        let session = Some(self.get_session().await?);
648
649        // reuse session
650        invoke_fn(
651            Some(ro),
652            |session| async {
653                let mut tx = self
654                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
655                    .await?;
656                let result = f(&mut tx);
657                tx.finish(result, Some(co.clone())).await
658            },
659            session,
660        )
661        .await
662    }
663
664    async fn create_read_write_transaction<E>(
665        &self,
666        session: Option<ManagedSession>,
667        bo: CallOptions,
668        transaction_tag: Option<String>,
669    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
670    where
671        E: TryAs<Status> + From<SessionError> + From<Status>,
672    {
673        ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag, self.disable_route_to_leader)
674            .await
675            .map_err(|e| (E::from(e.status), Some(e.session)))
676    }
677
678    async fn get_session(&self) -> Result<ManagedSession, SessionError> {
679        self.sessions.get().await
680    }
681
682    fn split_read_write_transaction_option(
683        options: ReadWriteTransactionOption,
684    ) -> (CallOptions, CommitOptions, Option<String>) {
685        (options.begin_options, options.commit_options, options.transaction_tag)
686    }
687}