gcloud_spanner/
client.rs

1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
21use crate::value::TimestampBound;
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25    pub begin_options: CallOptions,
26    pub query_options: Option<QueryOptions>,
27    /// The transaction tag to use for partitioned update operations.
28    pub transaction_tag: Option<String>,
29}
30
31#[derive(Clone)]
32pub struct ReadOnlyTransactionOption {
33    pub timestamp_bound: TimestampBound,
34    pub call_options: CallOptions,
35}
36
37impl Default for ReadOnlyTransactionOption {
38    fn default() -> Self {
39        ReadOnlyTransactionOption {
40            timestamp_bound: TimestampBound::strong_read(),
41            call_options: CallOptions::default(),
42        }
43    }
44}
45
46#[derive(Clone, Default)]
47pub struct ReadWriteTransactionOption {
48    pub begin_options: CallOptions,
49    pub commit_options: CommitOptions,
50    /// The transaction tag to use for this read/write transaction.
51    pub transaction_tag: Option<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct ChannelConfig {
56    /// num_channels is the number of gRPC channels.
57    pub num_channels: usize,
58    pub connect_timeout: Duration,
59    pub timeout: Duration,
60}
61
62impl Default for ChannelConfig {
63    fn default() -> Self {
64        ChannelConfig {
65            num_channels: 4,
66            connect_timeout: Duration::from_secs(30),
67            timeout: Duration::from_secs(30),
68        }
69    }
70}
71
72/// ClientConfig has configurations for the client.
73#[derive(Debug)]
74pub struct ClientConfig {
75    /// SessionPoolConfig is the configuration for session pool.
76    pub session_config: SessionConfig,
77    /// ChannelConfig is the configuration for gRPC connection.
78    pub channel_config: ChannelConfig,
79    /// Overriding service endpoint
80    pub endpoint: String,
81    /// Runtime project
82    pub environment: Environment,
83}
84
85impl Default for ClientConfig {
86    fn default() -> Self {
87        let mut config = ClientConfig {
88            channel_config: Default::default(),
89            session_config: Default::default(),
90            endpoint: SPANNER.to_string(),
91            environment: match var("SPANNER_EMULATOR_HOST").ok() {
92                Some(v) => Environment::Emulator(v),
93                None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
94            },
95        };
96        config.session_config.min_opened = config.channel_config.num_channels * 4;
97        config.session_config.max_opened = config.channel_config.num_channels * 100;
98        config
99    }
100}
101
102#[cfg(feature = "auth")]
103pub use google_cloud_auth;
104use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
105
106#[cfg(feature = "auth")]
107impl ClientConfig {
108    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
109        if let Environment::GoogleCloud(_) = self.environment {
110            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
111            self.environment = Environment::GoogleCloud(Box::new(ts))
112        }
113        Ok(self)
114    }
115
116    pub async fn with_credentials(
117        mut self,
118        credentials: google_cloud_auth::credentials::CredentialsFile,
119    ) -> Result<Self, google_cloud_auth::error::Error> {
120        if let Environment::GoogleCloud(_) = self.environment {
121            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
122                Self::auth_config(),
123                Box::new(credentials),
124            )
125            .await?;
126            self.environment = Environment::GoogleCloud(Box::new(ts))
127        }
128        Ok(self)
129    }
130
131    fn auth_config() -> google_cloud_auth::project::Config<'static> {
132        google_cloud_auth::project::Config::default()
133            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
134            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
135    }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum Error {
140    #[error(transparent)]
141    GRPC(#[from] Status),
142
143    #[error(transparent)]
144    InvalidSession(#[from] SessionError),
145
146    #[error(transparent)]
147    ParseError(#[from] crate::row::Error),
148
149    #[error(transparent)]
150    Connection(#[from] google_cloud_gax::conn::Error),
151
152    #[error("invalid config: {0}")]
153    InvalidConfig(String),
154}
155
156impl TryAs<Status> for Error {
157    fn try_as(&self) -> Option<&Status> {
158        match self {
159            Error::GRPC(e) => Some(e),
160            _ => None,
161        }
162    }
163}
164
165/// Client is a client for reading and writing data to a Cloud Spanner database.
166/// A client is safe to use concurrently, except for its Close method.
167#[derive(Clone)]
168pub struct Client {
169    sessions: Arc<SessionManager>,
170}
171
172impl Client {
173    /// new creates a client to a database. A valid database name has
174    /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
175    pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
176        if config.session_config.max_opened > config.channel_config.num_channels * 100 {
177            return Err(Error::InvalidConfig(format!(
178                "max session size is {} because max session size is 100 per gRPC connection",
179                config.channel_config.num_channels * 100
180            )));
181        }
182
183        let pool_size = config.channel_config.num_channels;
184        let options = ConnectionOptions {
185            timeout: Some(config.channel_config.timeout),
186            connect_timeout: Some(config.channel_config.connect_timeout),
187        };
188        let conn_pool =
189            ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
190        let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
191
192        Ok(Client {
193            sessions: session_manager,
194        })
195    }
196
197    /// Close closes all the sessions gracefully.
198    /// This method can be called only once.
199    pub async fn close(self) {
200        self.sessions.close().await;
201    }
202
203    /// single provides a read-only snapshot transaction optimized for the case
204    /// where only a single read or query is needed.  This is more efficient than
205    /// using read_only_transaction for a single read or query.
206    /// ```
207    /// use google_cloud_spanner::key::Key;
208    /// use google_cloud_spanner::statement::ToKind;
209    /// use google_cloud_spanner::client::Client;
210    ///
211    /// #[tokio::main]
212    /// async fn run(client: Client) {
213    ///     let mut tx = client.single().await.unwrap();
214    ///     let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
215    ///         Key::new(&"pk1"),
216    ///         Key::new(&"pk2")
217    ///     ]).await.unwrap();
218    /// }
219    /// ```
220    pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
221        self.single_with_timestamp_bound(TimestampBound::strong_read()).await
222    }
223
224    /// single provides a read-only snapshot transaction optimized for the case
225    /// where only a single read or query is needed.  This is more efficient than
226    /// using read_only_transaction for a single read or query.
227    pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
228        let session = self.get_session().await?;
229        let result = ReadOnlyTransaction::single(session, tb).await?;
230        Ok(result)
231    }
232
233    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
234    /// multiple reads from the database.
235    ///
236    /// ```ignore
237    /// use google_cloud_spanner::client::{Client, Error};
238    /// use google_cloud_spanner::statement::Statement;
239    /// use google_cloud_spanner::key::Key;
240    ///
241    /// async fn run(client: Client) -> Result<(), Error>{
242    ///     let mut tx = client.read_only_transaction().await?;
243    ///
244    ///     let mut stmt = Statement::new("SELECT * , \
245    ///             ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
246    ///             ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter  \
247    ///             FROM User \
248    ///             WHERE UserId = @Param1");
249    ///
250    ///     stmt.add_param("Param1", user_id);
251    ///     let mut reader = tx.query(stmt).await?;
252    ///     let mut data = vec![];
253    ///     while let Some(row) = reader.next().await? {
254    ///         let user_id= row.column_by_name::<String>("UserId")?;
255    ///         let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
256    ///         let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
257    ///         data.push(user_id);
258    ///     }
259    ///
260    ///     let mut reader2 = tx.read("User", &["UserId"], vec![
261    ///         Key::new(&"user-1"),
262    ///         Key::new(&"user-2")
263    ///     ]).await?;
264    ///
265    ///     Ok(())
266    /// }
267    pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
268        self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
269            .await
270    }
271
272    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
273    /// multiple reads from the database.
274    pub async fn read_only_transaction_with_option(
275        &self,
276        options: ReadOnlyTransactionOption,
277    ) -> Result<ReadOnlyTransaction, Error> {
278        let session = self.get_session().await?;
279        let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
280        Ok(result)
281    }
282
283    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
284    /// for partitioned reads or queries from a snapshot of the database. This is
285    /// useful in batch processing pipelines where one wants to divide the work of
286    /// reading from the database across multiple machines.
287    pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
288        self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
289            .await
290    }
291
292    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
293    /// for partitioned reads or queries from a snapshot of the database. This is
294    /// useful in batch processing pipelines where one wants to divide the work of
295    /// reading from the database across multiple machines.
296    pub async fn batch_read_only_transaction_with_option(
297        &self,
298        options: ReadOnlyTransactionOption,
299    ) -> Result<BatchReadOnlyTransaction, Error> {
300        let session = self.get_session().await?;
301        let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
302        Ok(result)
303    }
304
305    /// partitioned_update executes a DML statement in parallel across the database,
306    /// using separate, internal transactions that commit independently. The DML
307    /// statement must be fully partitionable: it must be expressible as the union
308    /// of many statements each of which accesses only a single row of the table. The
309    /// statement should also be idempotent, because it may be applied more than once.
310    ///
311    /// PartitionedUpdate returns an estimated count of the number of rows affected.
312    /// The actual number of affected rows may be greater than the estimate.
313    pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
314        self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
315            .await
316    }
317
318    /// partitioned_update executes a DML statement in parallel across the database,
319    /// using separate, internal transactions that commit independently. The DML
320    /// statement must be fully partitionable: it must be expressible as the union
321    /// of many statements each of which accesses only a single row of the table. The
322    /// statement should also be idempotent, because it may be applied more than once.
323    ///
324    /// PartitionedUpdate returns an estimated count of the number of rows affected.
325    /// The actual number of affected rows may be greater than the estimate.
326    pub async fn partitioned_update_with_option(
327        &self,
328        stmt: Statement,
329        options: PartitionedUpdateOption,
330    ) -> Result<i64, Error> {
331        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
332        let session = Some(self.get_session().await?);
333
334        // reuse session
335        invoke_fn(
336            Some(ro),
337            |session| async {
338                let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
339                    session.unwrap(),
340                    options.begin_options.clone(),
341                    options.transaction_tag.clone(),
342                )
343                .await
344                {
345                    Ok(tx) => tx,
346                    Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
347                };
348                tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
349                    .await
350                    .map_err(|e| (Error::GRPC(e), tx.take_session()))
351            },
352            session,
353        )
354        .await
355    }
356
357    /// apply_at_least_once may attempt to apply mutations more than once; if
358    /// the mutations are not idempotent, this may lead to a failure being reported
359    /// when the mutation was applied more than once. For example, an insert may
360    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
361    /// called. For this reason, most users of the library will prefer not to use
362    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
363    /// apply's default replay protection may require an additional RPC.  So this
364    /// method may be appropriate for latency sensitive and/or high throughput blind
365    /// writing.
366    pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
367        self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
368    }
369
370    /// apply_at_least_once may attempt to apply mutations more than once; if
371    /// the mutations are not idempotent, this may lead to a failure being reported
372    /// when the mutation was applied more than once. For example, an insert may
373    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
374    /// called. For this reason, most users of the library will prefer not to use
375    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
376    /// apply's default replay protection may require an additional RPC.  So this
377    /// method may be appropriate for latency sensitive and/or high throughput blind
378    /// writing.
379    pub async fn apply_at_least_once_with_option(
380        &self,
381        ms: Vec<Mutation>,
382        options: CommitOptions,
383    ) -> Result<Option<CommitResult>, Error> {
384        let ro = TransactionRetrySetting::default();
385        let mut session = self.get_session().await?;
386
387        invoke_fn(
388            Some(ro),
389            |session| async {
390                let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
391                    exclude_txn_from_change_streams: false,
392                    mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
393                    isolation_level: IsolationLevel::Unspecified as i32,
394                });
395                match commit(session, ms.clone(), tx, options.clone()).await {
396                    Ok(s) => Ok(Some(s.into())),
397                    Err(e) => Err((Error::GRPC(e), session)),
398                }
399            },
400            &mut session,
401        )
402        .await
403    }
404
405    /// Apply applies a list of mutations atomically to the database.
406    /// ```
407    /// use google_cloud_spanner::mutation::insert;
408    /// use google_cloud_spanner::mutation::delete;
409    /// use google_cloud_spanner::key::all_keys;
410    /// use google_cloud_spanner::statement::ToKind;
411    /// use google_cloud_spanner::client::{Client, Error};
412    /// use google_cloud_spanner::value::CommitTimestamp;
413    ///
414    /// async fn run(client: Client) -> Result<(), Error>{
415    ///     let m1 = delete("Guild", all_keys());
416    ///     let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
417    ///     let commit_timestamp = client.apply(vec![m1,m2]).await?;
418    ///     Ok(())
419    /// }
420    /// ```
421    pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
422        self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
423    }
424
425    /// Apply applies a list of mutations atomically to the database.
426    pub async fn apply_with_option(
427        &self,
428        ms: Vec<Mutation>,
429        options: ReadWriteTransactionOption,
430    ) -> Result<CommitResult, Error> {
431        let result: Result<(CommitResult, ()), Error> = self
432            .read_write_transaction_sync_with_option(
433                |tx| {
434                    tx.buffer_write(ms.to_vec());
435                    Ok(())
436                },
437                options,
438            )
439            .await;
440        Ok(result?.0)
441    }
442
443    /// ReadWriteTransaction executes a read-write transaction, with retries as
444    /// necessary.
445    ///
446    /// The function f will be called one or more times. It must not maintain
447    /// any state between calls.
448    ///
449    /// If the transaction cannot be committed or if f returns an ABORTED error,
450    /// ReadWriteTransaction will call f again. It will continue to call f until the
451    /// transaction can be committed or the Context times out or is cancelled.  If f
452    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
453    /// transaction and return the error.
454    ///
455    /// To limit the number of retries, set a deadline on the Context rather than
456    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
457    /// retry as needed until that deadline is met.
458    ///
459    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
460    /// more details.
461    /// ```
462    /// use google_cloud_spanner::mutation::update;
463    /// use google_cloud_spanner::key::{Key, all_keys};
464    /// use google_cloud_spanner::value::Timestamp;
465    /// use google_cloud_spanner::client::Error;
466    /// use google_cloud_spanner::client::Client;
467    ///
468    /// #[tokio::main]
469    /// async fn run(client: Client) ->  Result<(Option<Timestamp>,()), Error>{
470    ///     client.read_write_transaction(|tx| {
471    ///         Box::pin(async move {
472    ///             // The transaction function will be called again if the error code
473    ///             // of this error is Aborted. The backend may automatically abort
474    ///             // any read/write transaction if it detects a deadlock or other problems.
475    ///             let key = all_keys();
476    ///             let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
477    ///             let mut ms = vec![];
478    ///             while let Some(row) = reader.next().await? {
479    ///                 let user_id = row.column_by_name::<String>("UserId")?;
480    ///                 let item_id = row.column_by_name::<i64>("ItemId")?;
481    ///                 let quantity = row.column_by_name::<i64>("Quantity")? + 1;
482    ///                 let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
483    ///                 ms.push(m);
484    ///             }
485    ///             // The buffered mutation will be committed.  If the commit
486    ///             // fails with an Aborted error, this function will be called again
487    ///             tx.buffer_write(ms);
488    ///             Ok(())
489    ///         })
490    ///     }).await
491    /// }
492    pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
493    where
494        E: TryAs<Status> + From<SessionError> + From<Status>,
495        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
496    {
497        self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
498            .await
499    }
500
501    /// ReadWriteTransaction executes a read-write transaction, with retries as
502    /// necessary.
503    ///
504    /// The function f will be called one or more times. It must not maintain
505    /// any state between calls.
506    ///
507    /// If the transaction cannot be committed or if f returns an ABORTED error,
508    /// ReadWriteTransaction will call f again. It will continue to call f until the
509    /// transaction can be committed or the Context times out or is cancelled.  If f
510    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
511    /// transaction and return the error.
512    ///
513    /// To limit the number of retries, set a deadline on the Context rather than
514    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
515    /// retry as needed until that deadline is met.
516    ///
517    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
518    /// more details.
519    pub async fn read_write_transaction_with_option<'a, T, E, F>(
520        &'a self,
521        f: F,
522        options: ReadWriteTransactionOption,
523    ) -> Result<(CommitResult, T), E>
524    where
525        E: TryAs<Status> + From<SessionError> + From<Status>,
526        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
527    {
528        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
529
530        let ro = TransactionRetrySetting::default();
531        let session = Some(self.get_session().await?);
532        // must reuse session
533        invoke_fn(
534            Some(ro),
535            |session| async {
536                let mut tx = self
537                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
538                    .await?;
539                let result = f(&mut tx).await;
540                tx.finish(result, Some(co.clone())).await
541            },
542            session,
543        )
544        .await
545    }
546
547    /// begin_read_write_transaction creates new ReadWriteTransaction.
548    /// ```
549    /// use google_cloud_spanner::mutation::update;
550    /// use google_cloud_spanner::key::{Key, all_keys};
551    /// use google_cloud_spanner::value::Timestamp;
552    /// use google_cloud_spanner::client::Error;
553    /// use google_cloud_spanner::client::Client;
554    /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
555    /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
556    /// use google_cloud_spanner::retry::TransactionRetry;
557    ///
558    /// async fn run(client: Client) -> Result<(), Error>{
559    ///     let retry = &mut TransactionRetry::new();
560    ///     loop {
561    ///         let tx = &mut client.begin_read_write_transaction().await?;
562    ///
563    ///         let result = run_in_transaction(tx).await;
564    ///
565    ///         // try to commit or rollback transaction.
566    ///         match tx.end(result, None).await {
567    ///             Ok((_commit_timestamp, success)) => return Ok(success),
568    ///             Err(err) => retry.next(err).await? // check retry
569    ///         }
570    ///     }
571    /// }
572    ///
573    /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
574    ///     let key = all_keys();
575    ///     let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
576    ///     let mut ms = vec![];
577    ///     while let Some(row) = reader.next().await? {
578    ///         let user_id = row.column_by_name::<String>("UserId")?;
579    ///         let item_id = row.column_by_name::<i64>("ItemId")?;
580    ///         let quantity = row.column_by_name::<i64>("Quantity")? + 1;
581    ///         let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
582    ///         ms.push(m);
583    ///     }
584    ///     tx.buffer_write(ms);
585    ///     Ok(())
586    /// }
587    /// ```
588    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
589        let session = self.get_session().await?;
590        ReadWriteTransaction::begin(
591            session,
592            ReadWriteTransactionOption::default().begin_options,
593            ReadWriteTransactionOption::default().transaction_tag,
594        )
595        .await
596        .map_err(|e| e.status.into())
597    }
598
599    /// Get open session count.
600    pub fn session_count(&self) -> usize {
601        self.sessions.num_opened()
602    }
603
604    async fn read_write_transaction_sync_with_option<T, E>(
605        &self,
606        f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
607        options: ReadWriteTransactionOption,
608    ) -> Result<(CommitResult, T), E>
609    where
610        E: TryAs<Status> + From<SessionError> + From<Status>,
611    {
612        let (bo, co, tag) = Client::split_read_write_transaction_option(options);
613
614        let ro = TransactionRetrySetting::default();
615        let session = Some(self.get_session().await?);
616
617        // reuse session
618        invoke_fn(
619            Some(ro),
620            |session| async {
621                let mut tx = self
622                    .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
623                    .await?;
624                let result = f(&mut tx);
625                tx.finish(result, Some(co.clone())).await
626            },
627            session,
628        )
629        .await
630    }
631
632    async fn create_read_write_transaction<E>(
633        &self,
634        session: Option<ManagedSession>,
635        bo: CallOptions,
636        transaction_tag: Option<String>,
637    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
638    where
639        E: TryAs<Status> + From<SessionError> + From<Status>,
640    {
641        ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag)
642            .await
643            .map_err(|e| (E::from(e.status), Some(e.session)))
644    }
645
646    async fn get_session(&self) -> Result<ManagedSession, SessionError> {
647        self.sessions.get().await
648    }
649
650    fn split_read_write_transaction_option(
651        options: ReadWriteTransactionOption,
652    ) -> (CallOptions, CommitOptions, Option<String>) {
653        (options.begin_options, options.commit_options, options.transaction_tag)
654    }
655}