gcloud_spanner/
client.rs

1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
21use crate::value::TimestampBound;
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25    pub begin_options: CallOptions,
26    pub query_options: Option<QueryOptions>,
27}
28
29#[derive(Clone)]
30pub struct ReadOnlyTransactionOption {
31    pub timestamp_bound: TimestampBound,
32    pub call_options: CallOptions,
33}
34
35impl Default for ReadOnlyTransactionOption {
36    fn default() -> Self {
37        ReadOnlyTransactionOption {
38            timestamp_bound: TimestampBound::strong_read(),
39            call_options: CallOptions::default(),
40        }
41    }
42}
43
44#[derive(Clone, Default)]
45pub struct ReadWriteTransactionOption {
46    pub begin_options: CallOptions,
47    pub commit_options: CommitOptions,
48}
49
50#[derive(Clone, Debug)]
51pub struct ChannelConfig {
52    /// num_channels is the number of gRPC channels.
53    pub num_channels: usize,
54    pub connect_timeout: Duration,
55    pub timeout: Duration,
56}
57
58impl Default for ChannelConfig {
59    fn default() -> Self {
60        ChannelConfig {
61            num_channels: 4,
62            connect_timeout: Duration::from_secs(30),
63            timeout: Duration::from_secs(30),
64        }
65    }
66}
67
68/// ClientConfig has configurations for the client.
69#[derive(Debug)]
70pub struct ClientConfig {
71    /// SessionPoolConfig is the configuration for session pool.
72    pub session_config: SessionConfig,
73    /// ChannelConfig is the configuration for gRPC connection.
74    pub channel_config: ChannelConfig,
75    /// Overriding service endpoint
76    pub endpoint: String,
77    /// Runtime project
78    pub environment: Environment,
79}
80
81impl Default for ClientConfig {
82    fn default() -> Self {
83        let mut config = ClientConfig {
84            channel_config: Default::default(),
85            session_config: Default::default(),
86            endpoint: SPANNER.to_string(),
87            environment: match var("SPANNER_EMULATOR_HOST").ok() {
88                Some(v) => Environment::Emulator(v),
89                None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
90            },
91        };
92        config.session_config.min_opened = config.channel_config.num_channels * 4;
93        config.session_config.max_opened = config.channel_config.num_channels * 100;
94        config
95    }
96}
97
98#[cfg(feature = "auth")]
99pub use google_cloud_auth;
100use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
101
102#[cfg(feature = "auth")]
103impl ClientConfig {
104    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
105        if let Environment::GoogleCloud(_) = self.environment {
106            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
107            self.environment = Environment::GoogleCloud(Box::new(ts))
108        }
109        Ok(self)
110    }
111
112    pub async fn with_credentials(
113        mut self,
114        credentials: google_cloud_auth::credentials::CredentialsFile,
115    ) -> Result<Self, google_cloud_auth::error::Error> {
116        if let Environment::GoogleCloud(_) = self.environment {
117            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
118                Self::auth_config(),
119                Box::new(credentials),
120            )
121            .await?;
122            self.environment = Environment::GoogleCloud(Box::new(ts))
123        }
124        Ok(self)
125    }
126
127    fn auth_config() -> google_cloud_auth::project::Config<'static> {
128        google_cloud_auth::project::Config::default()
129            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
130            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
131    }
132}
133
134#[derive(thiserror::Error, Debug)]
135pub enum Error {
136    #[error(transparent)]
137    GRPC(#[from] Status),
138
139    #[error(transparent)]
140    InvalidSession(#[from] SessionError),
141
142    #[error(transparent)]
143    ParseError(#[from] crate::row::Error),
144
145    #[error(transparent)]
146    Connection(#[from] google_cloud_gax::conn::Error),
147
148    #[error("invalid config: {0}")]
149    InvalidConfig(String),
150}
151
152impl TryAs<Status> for Error {
153    fn try_as(&self) -> Option<&Status> {
154        match self {
155            Error::GRPC(e) => Some(e),
156            _ => None,
157        }
158    }
159}
160
161/// Client is a client for reading and writing data to a Cloud Spanner database.
162/// A client is safe to use concurrently, except for its Close method.
163#[derive(Clone)]
164pub struct Client {
165    sessions: Arc<SessionManager>,
166}
167
168impl Client {
169    /// new creates a client to a database. A valid database name has
170    /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
171    pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
172        if config.session_config.max_opened > config.channel_config.num_channels * 100 {
173            return Err(Error::InvalidConfig(format!(
174                "max session size is {} because max session size is 100 per gRPC connection",
175                config.channel_config.num_channels * 100
176            )));
177        }
178
179        let pool_size = config.channel_config.num_channels;
180        let options = ConnectionOptions {
181            timeout: Some(config.channel_config.timeout),
182            connect_timeout: Some(config.channel_config.connect_timeout),
183        };
184        let conn_pool =
185            ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
186        let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
187
188        Ok(Client {
189            sessions: session_manager,
190        })
191    }
192
193    /// Close closes all the sessions gracefully.
194    /// This method can be called only once.
195    pub async fn close(self) {
196        self.sessions.close().await;
197    }
198
199    /// single provides a read-only snapshot transaction optimized for the case
200    /// where only a single read or query is needed.  This is more efficient than
201    /// using read_only_transaction for a single read or query.
202    /// ```
203    /// use google_cloud_spanner::key::Key;
204    /// use google_cloud_spanner::statement::ToKind;
205    /// use google_cloud_spanner::client::Client;
206    ///
207    /// #[tokio::main]
208    /// async fn run(client: Client) {
209    ///     let mut tx = client.single().await.unwrap();
210    ///     let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
211    ///         Key::new(&"pk1"),
212    ///         Key::new(&"pk2")
213    ///     ]).await.unwrap();
214    /// }
215    /// ```
216    pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
217        self.single_with_timestamp_bound(TimestampBound::strong_read()).await
218    }
219
220    /// single provides a read-only snapshot transaction optimized for the case
221    /// where only a single read or query is needed.  This is more efficient than
222    /// using read_only_transaction for a single read or query.
223    pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
224        let session = self.get_session().await?;
225        let result = ReadOnlyTransaction::single(session, tb).await?;
226        Ok(result)
227    }
228
229    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
230    /// multiple reads from the database.
231    ///
232    /// ```ignore
233    /// use google_cloud_spanner::client::{Client, Error};
234    /// use google_cloud_spanner::statement::Statement;
235    /// use google_cloud_spanner::key::Key;
236    ///
237    /// async fn run(client: Client) -> Result<(), Error>{
238    ///     let mut tx = client.read_only_transaction().await?;
239    ///
240    ///     let mut stmt = Statement::new("SELECT * , \
241    ///             ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
242    ///             ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter  \
243    ///             FROM User \
244    ///             WHERE UserId = @Param1");
245    ///
246    ///     stmt.add_param("Param1", user_id);
247    ///     let mut reader = tx.query(stmt).await?;
248    ///     let mut data = vec![];
249    ///     while let Some(row) = reader.next().await? {
250    ///         let user_id= row.column_by_name::<String>("UserId")?;
251    ///         let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
252    ///         let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
253    ///         data.push(user_id);
254    ///     }
255    ///
256    ///     let mut reader2 = tx.read("User", &["UserId"], vec![
257    ///         Key::new(&"user-1"),
258    ///         Key::new(&"user-2")
259    ///     ]).await?;
260    ///
261    ///     Ok(())
262    /// }
263    pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
264        self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
265            .await
266    }
267
268    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
269    /// multiple reads from the database.
270    pub async fn read_only_transaction_with_option(
271        &self,
272        options: ReadOnlyTransactionOption,
273    ) -> Result<ReadOnlyTransaction, Error> {
274        let session = self.get_session().await?;
275        let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
276        Ok(result)
277    }
278
279    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
280    /// for partitioned reads or queries from a snapshot of the database. This is
281    /// useful in batch processing pipelines where one wants to divide the work of
282    /// reading from the database across multiple machines.
283    pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
284        self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
285            .await
286    }
287
288    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
289    /// for partitioned reads or queries from a snapshot of the database. This is
290    /// useful in batch processing pipelines where one wants to divide the work of
291    /// reading from the database across multiple machines.
292    pub async fn batch_read_only_transaction_with_option(
293        &self,
294        options: ReadOnlyTransactionOption,
295    ) -> Result<BatchReadOnlyTransaction, Error> {
296        let session = self.get_session().await?;
297        let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
298        Ok(result)
299    }
300
301    /// partitioned_update executes a DML statement in parallel across the database,
302    /// using separate, internal transactions that commit independently. The DML
303    /// statement must be fully partitionable: it must be expressible as the union
304    /// of many statements each of which accesses only a single row of the table. The
305    /// statement should also be idempotent, because it may be applied more than once.
306    ///
307    /// PartitionedUpdate returns an estimated count of the number of rows affected.
308    /// The actual number of affected rows may be greater than the estimate.
309    pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
310        self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
311            .await
312    }
313
314    /// partitioned_update executes a DML statement in parallel across the database,
315    /// using separate, internal transactions that commit independently. The DML
316    /// statement must be fully partitionable: it must be expressible as the union
317    /// of many statements each of which accesses only a single row of the table. The
318    /// statement should also be idempotent, because it may be applied more than once.
319    ///
320    /// PartitionedUpdate returns an estimated count of the number of rows affected.
321    /// The actual number of affected rows may be greater than the estimate.
322    pub async fn partitioned_update_with_option(
323        &self,
324        stmt: Statement,
325        options: PartitionedUpdateOption,
326    ) -> Result<i64, Error> {
327        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
328        let session = Some(self.get_session().await?);
329
330        // reuse session
331        invoke_fn(
332            Some(ro),
333            |session| async {
334                let mut tx =
335                    match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
336                        .await
337                    {
338                        Ok(tx) => tx,
339                        Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
340                    };
341                let qo = options.query_options.clone().unwrap_or_default();
342                tx.update_with_option(stmt.clone(), qo)
343                    .await
344                    .map_err(|e| (Error::GRPC(e), tx.take_session()))
345            },
346            session,
347        )
348        .await
349    }
350
351    /// apply_at_least_once may attempt to apply mutations more than once; if
352    /// the mutations are not idempotent, this may lead to a failure being reported
353    /// when the mutation was applied more than once. For example, an insert may
354    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
355    /// called. For this reason, most users of the library will prefer not to use
356    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
357    /// apply's default replay protection may require an additional RPC.  So this
358    /// method may be appropriate for latency sensitive and/or high throughput blind
359    /// writing.
360    pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
361        self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
362    }
363
364    /// apply_at_least_once may attempt to apply mutations more than once; if
365    /// the mutations are not idempotent, this may lead to a failure being reported
366    /// when the mutation was applied more than once. For example, an insert may
367    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
368    /// called. For this reason, most users of the library will prefer not to use
369    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
370    /// apply's default replay protection may require an additional RPC.  So this
371    /// method may be appropriate for latency sensitive and/or high throughput blind
372    /// writing.
373    pub async fn apply_at_least_once_with_option(
374        &self,
375        ms: Vec<Mutation>,
376        options: CommitOptions,
377    ) -> Result<Option<CommitResult>, Error> {
378        let ro = TransactionRetrySetting::default();
379        let mut session = self.get_session().await?;
380
381        invoke_fn(
382            Some(ro),
383            |session| async {
384                let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
385                    exclude_txn_from_change_streams: false,
386                    mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
387                    isolation_level: IsolationLevel::Unspecified as i32,
388                });
389                match commit(session, ms.clone(), tx, options.clone()).await {
390                    Ok(s) => Ok(Some(s.into())),
391                    Err(e) => Err((Error::GRPC(e), session)),
392                }
393            },
394            &mut session,
395        )
396        .await
397    }
398
399    /// Apply applies a list of mutations atomically to the database.
400    /// ```
401    /// use google_cloud_spanner::mutation::insert;
402    /// use google_cloud_spanner::mutation::delete;
403    /// use google_cloud_spanner::key::all_keys;
404    /// use google_cloud_spanner::statement::ToKind;
405    /// use google_cloud_spanner::client::{Client, Error};
406    /// use google_cloud_spanner::value::CommitTimestamp;
407    ///
408    /// async fn run(client: Client) -> Result<(), Error>{
409    ///     let m1 = delete("Guild", all_keys());
410    ///     let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
411    ///     let commit_timestamp = client.apply(vec![m1,m2]).await?;
412    ///     Ok(())
413    /// }
414    /// ```
415    pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
416        self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
417    }
418
419    /// Apply applies a list of mutations atomically to the database.
420    pub async fn apply_with_option(
421        &self,
422        ms: Vec<Mutation>,
423        options: ReadWriteTransactionOption,
424    ) -> Result<CommitResult, Error> {
425        let result: Result<(CommitResult, ()), Error> = self
426            .read_write_transaction_sync_with_option(
427                |tx| {
428                    tx.buffer_write(ms.to_vec());
429                    Ok(())
430                },
431                options,
432            )
433            .await;
434        Ok(result?.0)
435    }
436
437    /// ReadWriteTransaction executes a read-write transaction, with retries as
438    /// necessary.
439    ///
440    /// The function f will be called one or more times. It must not maintain
441    /// any state between calls.
442    ///
443    /// If the transaction cannot be committed or if f returns an ABORTED error,
444    /// ReadWriteTransaction will call f again. It will continue to call f until the
445    /// transaction can be committed or the Context times out or is cancelled.  If f
446    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
447    /// transaction and return the error.
448    ///
449    /// To limit the number of retries, set a deadline on the Context rather than
450    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
451    /// retry as needed until that deadline is met.
452    ///
453    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
454    /// more details.
455    /// ```
456    /// use google_cloud_spanner::mutation::update;
457    /// use google_cloud_spanner::key::{Key, all_keys};
458    /// use google_cloud_spanner::value::Timestamp;
459    /// use google_cloud_spanner::client::Error;
460    /// use google_cloud_spanner::client::Client;
461    ///
462    /// #[tokio::main]
463    /// async fn run(client: Client) ->  Result<(Option<Timestamp>,()), Error>{
464    ///     client.read_write_transaction(|tx| {
465    ///         Box::pin(async move {
466    ///             // The transaction function will be called again if the error code
467    ///             // of this error is Aborted. The backend may automatically abort
468    ///             // any read/write transaction if it detects a deadlock or other problems.
469    ///             let key = all_keys();
470    ///             let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
471    ///             let mut ms = vec![];
472    ///             while let Some(row) = reader.next().await? {
473    ///                 let user_id = row.column_by_name::<String>("UserId")?;
474    ///                 let item_id = row.column_by_name::<i64>("ItemId")?;
475    ///                 let quantity = row.column_by_name::<i64>("Quantity")? + 1;
476    ///                 let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
477    ///                 ms.push(m);
478    ///             }
479    ///             // The buffered mutation will be committed.  If the commit
480    ///             // fails with an Aborted error, this function will be called again
481    ///             tx.buffer_write(ms);
482    ///             Ok(())
483    ///         })
484    ///     }).await
485    /// }
486    pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
487    where
488        E: TryAs<Status> + From<SessionError> + From<Status>,
489        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
490    {
491        self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
492            .await
493    }
494
495    /// ReadWriteTransaction executes a read-write transaction, with retries as
496    /// necessary.
497    ///
498    /// The function f will be called one or more times. It must not maintain
499    /// any state between calls.
500    ///
501    /// If the transaction cannot be committed or if f returns an ABORTED error,
502    /// ReadWriteTransaction will call f again. It will continue to call f until the
503    /// transaction can be committed or the Context times out or is cancelled.  If f
504    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
505    /// transaction and return the error.
506    ///
507    /// To limit the number of retries, set a deadline on the Context rather than
508    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
509    /// retry as needed until that deadline is met.
510    ///
511    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
512    /// more details.
513    pub async fn read_write_transaction_with_option<'a, T, E, F>(
514        &'a self,
515        f: F,
516        options: ReadWriteTransactionOption,
517    ) -> Result<(CommitResult, T), E>
518    where
519        E: TryAs<Status> + From<SessionError> + From<Status>,
520        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
521    {
522        let (bo, co) = Client::split_read_write_transaction_option(options);
523
524        let ro = TransactionRetrySetting::default();
525        let session = Some(self.get_session().await?);
526        // must reuse session
527        invoke_fn(
528            Some(ro),
529            |session| async {
530                let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
531                let result = f(&mut tx).await;
532                tx.finish(result, Some(co.clone())).await
533            },
534            session,
535        )
536        .await
537    }
538
539    /// begin_read_write_transaction creates new ReadWriteTransaction.
540    /// ```
541    /// use google_cloud_spanner::mutation::update;
542    /// use google_cloud_spanner::key::{Key, all_keys};
543    /// use google_cloud_spanner::value::Timestamp;
544    /// use google_cloud_spanner::client::Error;
545    /// use google_cloud_spanner::client::Client;
546    /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
547    /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
548    /// use google_cloud_spanner::retry::TransactionRetry;
549    ///
550    /// async fn run(client: Client) -> Result<(), Error>{
551    ///     let retry = &mut TransactionRetry::new();
552    ///     loop {
553    ///         let tx = &mut client.begin_read_write_transaction().await?;
554    ///
555    ///         let result = run_in_transaction(tx).await;
556    ///
557    ///         // try to commit or rollback transaction.
558    ///         match tx.end(result, None).await {
559    ///             Ok((_commit_timestamp, success)) => return Ok(success),
560    ///             Err(err) => retry.next(err).await? // check retry
561    ///         }
562    ///     }
563    /// }
564    ///
565    /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
566    ///     let key = all_keys();
567    ///     let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
568    ///     let mut ms = vec![];
569    ///     while let Some(row) = reader.next().await? {
570    ///         let user_id = row.column_by_name::<String>("UserId")?;
571    ///         let item_id = row.column_by_name::<i64>("ItemId")?;
572    ///         let quantity = row.column_by_name::<i64>("Quantity")? + 1;
573    ///         let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
574    ///         ms.push(m);
575    ///     }
576    ///     tx.buffer_write(ms);
577    ///     Ok(())
578    /// }
579    /// ```
580    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
581        let session = self.get_session().await?;
582        ReadWriteTransaction::begin(session, ReadWriteTransactionOption::default().begin_options)
583            .await
584            .map_err(|e| e.status.into())
585    }
586
587    /// Get open session count.
588    pub fn session_count(&self) -> usize {
589        self.sessions.num_opened()
590    }
591
592    async fn read_write_transaction_sync_with_option<T, E>(
593        &self,
594        f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
595        options: ReadWriteTransactionOption,
596    ) -> Result<(CommitResult, T), E>
597    where
598        E: TryAs<Status> + From<SessionError> + From<Status>,
599    {
600        let (bo, co) = Client::split_read_write_transaction_option(options);
601
602        let ro = TransactionRetrySetting::default();
603        let session = Some(self.get_session().await?);
604
605        // reuse session
606        invoke_fn(
607            Some(ro),
608            |session| async {
609                let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
610                let result = f(&mut tx);
611                tx.finish(result, Some(co.clone())).await
612            },
613            session,
614        )
615        .await
616    }
617
618    async fn create_read_write_transaction<E>(
619        &self,
620        session: Option<ManagedSession>,
621        bo: CallOptions,
622    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
623    where
624        E: TryAs<Status> + From<SessionError> + From<Status>,
625    {
626        ReadWriteTransaction::begin(session.unwrap(), bo)
627            .await
628            .map_err(|e| (E::from(e.status), Some(e.session)))
629    }
630
631    async fn get_session(&self) -> Result<ManagedSession, SessionError> {
632        self.sessions.get().await
633    }
634
635    fn split_read_write_transaction_option(options: ReadWriteTransactionOption) -> (CallOptions, CommitOptions) {
636        (options.begin_options, options.commit_options)
637    }
638}