google_cloud_spanner/
client.rs

1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use google_cloud_token::NopeTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, ReadWriteTransaction};
21use crate::value::{Timestamp, TimestampBound};
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25    pub begin_options: CallOptions,
26    pub query_options: Option<QueryOptions>,
27}
28
29#[derive(Clone)]
30pub struct ReadOnlyTransactionOption {
31    pub timestamp_bound: TimestampBound,
32    pub call_options: CallOptions,
33}
34
35impl Default for ReadOnlyTransactionOption {
36    fn default() -> Self {
37        ReadOnlyTransactionOption {
38            timestamp_bound: TimestampBound::strong_read(),
39            call_options: CallOptions::default(),
40        }
41    }
42}
43
44#[derive(Clone, Default)]
45pub struct ReadWriteTransactionOption {
46    pub begin_options: CallOptions,
47    pub commit_options: CommitOptions,
48}
49
50#[derive(Clone, Debug)]
51pub struct ChannelConfig {
52    /// num_channels is the number of gRPC channels.
53    pub num_channels: usize,
54    pub connect_timeout: Duration,
55    pub timeout: Duration,
56}
57
58impl Default for ChannelConfig {
59    fn default() -> Self {
60        ChannelConfig {
61            num_channels: 4,
62            connect_timeout: Duration::from_secs(30),
63            timeout: Duration::from_secs(30),
64        }
65    }
66}
67
68/// ClientConfig has configurations for the client.
69#[derive(Debug)]
70pub struct ClientConfig {
71    /// SessionPoolConfig is the configuration for session pool.
72    pub session_config: SessionConfig,
73    /// ChannelConfig is the configuration for gRPC connection.
74    pub channel_config: ChannelConfig,
75    /// Overriding service endpoint
76    pub endpoint: String,
77    /// Runtime project
78    pub environment: Environment,
79}
80
81impl Default for ClientConfig {
82    fn default() -> Self {
83        let mut config = ClientConfig {
84            channel_config: Default::default(),
85            session_config: Default::default(),
86            endpoint: SPANNER.to_string(),
87            environment: match var("SPANNER_EMULATOR_HOST").ok() {
88                Some(v) => Environment::Emulator(v),
89                None => Environment::GoogleCloud(Box::new(NopeTokenSourceProvider {})),
90            },
91        };
92        config.session_config.min_opened = config.channel_config.num_channels * 4;
93        config.session_config.max_opened = config.channel_config.num_channels * 100;
94        config
95    }
96}
97
98#[cfg(feature = "auth")]
99pub use google_cloud_auth;
100
101#[cfg(feature = "auth")]
102impl ClientConfig {
103    pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
104        if let Environment::GoogleCloud(_) = self.environment {
105            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
106            self.environment = Environment::GoogleCloud(Box::new(ts))
107        }
108        Ok(self)
109    }
110
111    pub async fn with_credentials(
112        mut self,
113        credentials: google_cloud_auth::credentials::CredentialsFile,
114    ) -> Result<Self, google_cloud_auth::error::Error> {
115        if let Environment::GoogleCloud(_) = self.environment {
116            let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
117                Self::auth_config(),
118                Box::new(credentials),
119            )
120            .await?;
121            self.environment = Environment::GoogleCloud(Box::new(ts))
122        }
123        Ok(self)
124    }
125
126    fn auth_config() -> google_cloud_auth::project::Config<'static> {
127        google_cloud_auth::project::Config::default()
128            .with_audience(crate::apiv1::conn_pool::AUDIENCE)
129            .with_scopes(&crate::apiv1::conn_pool::SCOPES)
130    }
131}
132
133#[derive(thiserror::Error, Debug)]
134pub enum Error {
135    #[error(transparent)]
136    GRPC(#[from] Status),
137
138    #[error(transparent)]
139    InvalidSession(#[from] SessionError),
140
141    #[error(transparent)]
142    ParseError(#[from] crate::row::Error),
143
144    #[error(transparent)]
145    Connection(#[from] google_cloud_gax::conn::Error),
146
147    #[error("invalid config: {0}")]
148    InvalidConfig(String),
149}
150
151impl TryAs<Status> for Error {
152    fn try_as(&self) -> Option<&Status> {
153        match self {
154            Error::GRPC(e) => Some(e),
155            _ => None,
156        }
157    }
158}
159
160/// Client is a client for reading and writing data to a Cloud Spanner database.
161/// A client is safe to use concurrently, except for its Close method.
162#[derive(Clone)]
163pub struct Client {
164    sessions: Arc<SessionManager>,
165}
166
167impl Client {
168    /// new creates a client to a database. A valid database name has
169    /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
170    pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
171        if config.session_config.max_opened > config.channel_config.num_channels * 100 {
172            return Err(Error::InvalidConfig(format!(
173                "max session size is {} because max session size is 100 per gRPC connection",
174                config.channel_config.num_channels * 100
175            )));
176        }
177
178        let pool_size = config.channel_config.num_channels;
179        let options = ConnectionOptions {
180            timeout: Some(config.channel_config.timeout),
181            connect_timeout: Some(config.channel_config.connect_timeout),
182        };
183        let conn_pool =
184            ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
185        let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
186
187        Ok(Client {
188            sessions: session_manager,
189        })
190    }
191
192    /// Close closes all the sessions gracefully.
193    /// This method can be called only once.
194    pub async fn close(self) {
195        self.sessions.close().await;
196    }
197
198    /// single provides a read-only snapshot transaction optimized for the case
199    /// where only a single read or query is needed.  This is more efficient than
200    /// using read_only_transaction for a single read or query.
201    /// ```
202    /// use google_cloud_spanner::key::Key;
203    /// use google_cloud_spanner::statement::ToKind;
204    /// use google_cloud_spanner::client::Client;
205    ///
206    /// #[tokio::main]
207    /// async fn run(client: Client) {
208    ///     let mut tx = client.single().await.unwrap();
209    ///     let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
210    ///         Key::new(&"pk1"),
211    ///         Key::new(&"pk2")
212    ///     ]).await.unwrap();
213    /// }
214    /// ```
215    pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
216        self.single_with_timestamp_bound(TimestampBound::strong_read()).await
217    }
218
219    /// single provides a read-only snapshot transaction optimized for the case
220    /// where only a single read or query is needed.  This is more efficient than
221    /// using read_only_transaction for a single read or query.
222    pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
223        let session = self.get_session().await?;
224        let result = ReadOnlyTransaction::single(session, tb).await?;
225        Ok(result)
226    }
227
228    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
229    /// multiple reads from the database.
230    ///
231    /// ```ignore
232    /// use google_cloud_spanner::client::{Client, Error};
233    /// use google_cloud_spanner::statement::Statement;
234    /// use google_cloud_spanner::key::Key;
235    ///
236    /// async fn run(client: Client) -> Result<(), Error>{
237    ///     let mut tx = client.read_only_transaction().await?;
238    ///
239    ///     let mut stmt = Statement::new("SELECT * , \
240    ///             ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
241    ///             ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter  \
242    ///             FROM User \
243    ///             WHERE UserId = @Param1");
244    ///
245    ///     stmt.add_param("Param1", user_id);
246    ///     let mut reader = tx.query(stmt).await?;
247    ///     let mut data = vec![];
248    ///     while let Some(row) = reader.next().await? {
249    ///         let user_id= row.column_by_name::<String>("UserId")?;
250    ///         let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
251    ///         let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
252    ///         data.push(user_id);
253    ///     }
254    ///
255    ///     let mut reader2 = tx.read("User", &["UserId"], vec![
256    ///         Key::new(&"user-1"),
257    ///         Key::new(&"user-2")
258    ///     ]).await?;
259    ///
260    ///     Ok(())
261    /// }
262    pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
263        self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
264            .await
265    }
266
267    /// read_only_transaction returns a ReadOnlyTransaction that can be used for
268    /// multiple reads from the database.
269    pub async fn read_only_transaction_with_option(
270        &self,
271        options: ReadOnlyTransactionOption,
272    ) -> Result<ReadOnlyTransaction, Error> {
273        let session = self.get_session().await?;
274        let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
275        Ok(result)
276    }
277
278    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
279    /// for partitioned reads or queries from a snapshot of the database. This is
280    /// useful in batch processing pipelines where one wants to divide the work of
281    /// reading from the database across multiple machines.
282    pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
283        self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
284            .await
285    }
286
287    /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
288    /// for partitioned reads or queries from a snapshot of the database. This is
289    /// useful in batch processing pipelines where one wants to divide the work of
290    /// reading from the database across multiple machines.
291    pub async fn batch_read_only_transaction_with_option(
292        &self,
293        options: ReadOnlyTransactionOption,
294    ) -> Result<BatchReadOnlyTransaction, Error> {
295        let session = self.get_session().await?;
296        let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
297        Ok(result)
298    }
299
300    /// partitioned_update executes a DML statement in parallel across the database,
301    /// using separate, internal transactions that commit independently. The DML
302    /// statement must be fully partitionable: it must be expressible as the union
303    /// of many statements each of which accesses only a single row of the table. The
304    /// statement should also be idempotent, because it may be applied more than once.
305    ///
306    /// PartitionedUpdate returns an estimated count of the number of rows affected.
307    /// The actual number of affected rows may be greater than the estimate.
308    pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
309        self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
310            .await
311    }
312
313    /// partitioned_update executes a DML statement in parallel across the database,
314    /// using separate, internal transactions that commit independently. The DML
315    /// statement must be fully partitionable: it must be expressible as the union
316    /// of many statements each of which accesses only a single row of the table. The
317    /// statement should also be idempotent, because it may be applied more than once.
318    ///
319    /// PartitionedUpdate returns an estimated count of the number of rows affected.
320    /// The actual number of affected rows may be greater than the estimate.
321    pub async fn partitioned_update_with_option(
322        &self,
323        stmt: Statement,
324        options: PartitionedUpdateOption,
325    ) -> Result<i64, Error> {
326        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
327        let session = Some(self.get_session().await?);
328
329        // reuse session
330        invoke_fn(
331            Some(ro),
332            |session| async {
333                let mut tx =
334                    match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
335                        .await
336                    {
337                        Ok(tx) => tx,
338                        Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
339                    };
340                let qo = options.query_options.clone().unwrap_or_default();
341                tx.update_with_option(stmt.clone(), qo)
342                    .await
343                    .map_err(|e| (Error::GRPC(e), tx.take_session()))
344            },
345            session,
346        )
347        .await
348    }
349
350    /// apply_at_least_once may attempt to apply mutations more than once; if
351    /// the mutations are not idempotent, this may lead to a failure being reported
352    /// when the mutation was applied more than once. For example, an insert may
353    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
354    /// called. For this reason, most users of the library will prefer not to use
355    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
356    /// apply's default replay protection may require an additional RPC.  So this
357    /// method may be appropriate for latency sensitive and/or high throughput blind
358    /// writing.
359    pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<Timestamp>, Error> {
360        self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
361    }
362
363    /// apply_at_least_once may attempt to apply mutations more than once; if
364    /// the mutations are not idempotent, this may lead to a failure being reported
365    /// when the mutation was applied more than once. For example, an insert may
366    /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
367    /// called. For this reason, most users of the library will prefer not to use
368    /// this option.  However, apply_at_least_once requires only a single RPC, whereas
369    /// apply's default replay protection may require an additional RPC.  So this
370    /// method may be appropriate for latency sensitive and/or high throughput blind
371    /// writing.
372    pub async fn apply_at_least_once_with_option(
373        &self,
374        ms: Vec<Mutation>,
375        options: CommitOptions,
376    ) -> Result<Option<Timestamp>, Error> {
377        let ro = TransactionRetrySetting::default();
378        let mut session = self.get_session().await?;
379
380        invoke_fn(
381            Some(ro),
382            |session| async {
383                let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
384                    exclude_txn_from_change_streams: false,
385                    mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
386                });
387                match commit(session, ms.clone(), tx, options.clone()).await {
388                    Ok(s) => Ok(s.commit_timestamp.map(|s| s.into())),
389                    Err(e) => Err((Error::GRPC(e), session)),
390                }
391            },
392            &mut session,
393        )
394        .await
395    }
396
397    /// Apply applies a list of mutations atomically to the database.
398    /// ```
399    /// use google_cloud_spanner::mutation::insert;
400    /// use google_cloud_spanner::mutation::delete;
401    /// use google_cloud_spanner::key::all_keys;
402    /// use google_cloud_spanner::statement::ToKind;
403    /// use google_cloud_spanner::client::{Client, Error};
404    /// use google_cloud_spanner::value::CommitTimestamp;
405    ///
406    /// async fn run(client: Client) -> Result<(), Error>{
407    ///     let m1 = delete("Guild", all_keys());
408    ///     let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
409    ///     let commit_timestamp = client.apply(vec![m1,m2]).await?;
410    ///     Ok(())
411    /// }
412    /// ```
413    pub async fn apply(&self, ms: Vec<Mutation>) -> Result<Option<Timestamp>, Error> {
414        self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
415    }
416
417    /// Apply applies a list of mutations atomically to the database.
418    pub async fn apply_with_option(
419        &self,
420        ms: Vec<Mutation>,
421        options: ReadWriteTransactionOption,
422    ) -> Result<Option<Timestamp>, Error> {
423        let result: Result<(Option<Timestamp>, ()), Error> = self
424            .read_write_transaction_sync_with_option(
425                |tx| {
426                    tx.buffer_write(ms.to_vec());
427                    Ok(())
428                },
429                options,
430            )
431            .await;
432        Ok(result?.0)
433    }
434
435    /// ReadWriteTransaction executes a read-write transaction, with retries as
436    /// necessary.
437    ///
438    /// The function f will be called one or more times. It must not maintain
439    /// any state between calls.
440    ///
441    /// If the transaction cannot be committed or if f returns an ABORTED error,
442    /// ReadWriteTransaction will call f again. It will continue to call f until the
443    /// transaction can be committed or the Context times out or is cancelled.  If f
444    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
445    /// transaction and return the error.
446    ///
447    /// To limit the number of retries, set a deadline on the Context rather than
448    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
449    /// retry as needed until that deadline is met.
450    ///
451    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
452    /// more details.
453    /// ```
454    /// use google_cloud_spanner::mutation::update;
455    /// use google_cloud_spanner::key::{Key, all_keys};
456    /// use google_cloud_spanner::value::Timestamp;
457    /// use google_cloud_spanner::client::Error;
458    /// use google_cloud_spanner::client::Client;
459    ///
460    /// #[tokio::main]
461    /// async fn run(client: Client) ->  Result<(Option<Timestamp>,()), Error>{
462    ///     client.read_write_transaction(|tx| {
463    ///         Box::pin(async move {
464    ///             // The transaction function will be called again if the error code
465    ///             // of this error is Aborted. The backend may automatically abort
466    ///             // any read/write transaction if it detects a deadlock or other problems.
467    ///             let key = all_keys();
468    ///             let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
469    ///             let mut ms = vec![];
470    ///             while let Some(row) = reader.next().await? {
471    ///                 let user_id = row.column_by_name::<String>("UserId")?;
472    ///                 let item_id = row.column_by_name::<i64>("ItemId")?;
473    ///                 let quantity = row.column_by_name::<i64>("Quantity")? + 1;
474    ///                 let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
475    ///                 ms.push(m);
476    ///             }
477    ///             // The buffered mutation will be committed.  If the commit
478    ///             // fails with an Aborted error, this function will be called again
479    ///             tx.buffer_write(ms);
480    ///             Ok(())
481    ///         })
482    ///     }).await
483    /// }
484    pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option<Timestamp>, T), E>
485    where
486        E: TryAs<Status> + From<SessionError> + From<Status>,
487        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
488    {
489        self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
490            .await
491    }
492
493    /// ReadWriteTransaction executes a read-write transaction, with retries as
494    /// necessary.
495    ///
496    /// The function f will be called one or more times. It must not maintain
497    /// any state between calls.
498    ///
499    /// If the transaction cannot be committed or if f returns an ABORTED error,
500    /// ReadWriteTransaction will call f again. It will continue to call f until the
501    /// transaction can be committed or the Context times out or is cancelled.  If f
502    /// returns an error other than ABORTED, ReadWriteTransaction will abort the
503    /// transaction and return the error.
504    ///
505    /// To limit the number of retries, set a deadline on the Context rather than
506    /// using a fixed limit on the number of attempts. ReadWriteTransaction will
507    /// retry as needed until that deadline is met.
508    ///
509    /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
510    /// more details.
511    pub async fn read_write_transaction_with_option<'a, T, E, F>(
512        &'a self,
513        f: F,
514        options: ReadWriteTransactionOption,
515    ) -> Result<(Option<Timestamp>, T), E>
516    where
517        E: TryAs<Status> + From<SessionError> + From<Status>,
518        F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
519    {
520        let (bo, co) = Client::split_read_write_transaction_option(options);
521
522        let ro = TransactionRetrySetting::default();
523        let session = Some(self.get_session().await?);
524        // must reuse session
525        invoke_fn(
526            Some(ro),
527            |session| async {
528                let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
529                let result = f(&mut tx).await;
530                tx.finish(result, Some(co.clone())).await
531            },
532            session,
533        )
534        .await
535    }
536
537    /// begin_read_write_transaction creates new ReadWriteTransaction.
538    /// ```
539    /// use google_cloud_spanner::mutation::update;
540    /// use google_cloud_spanner::key::{Key, all_keys};
541    /// use google_cloud_spanner::value::Timestamp;
542    /// use google_cloud_spanner::client::Error;
543    /// use google_cloud_spanner::client::Client;
544    /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
545    /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
546    /// use google_cloud_spanner::retry::TransactionRetry;
547    ///
548    /// async fn run(client: Client) -> Result<(), Error>{
549    ///     let retry = &mut TransactionRetry::new();
550    ///     loop {
551    ///         let tx = &mut client.begin_read_write_transaction().await?;
552    ///
553    ///         let result = run_in_transaction(tx).await;
554    ///
555    ///         // try to commit or rollback transaction.
556    ///         match tx.end(result, None).await {
557    ///             Ok((_commit_timestamp, success)) => return Ok(success),
558    ///             Err(err) => retry.next(err).await? // check retry
559    ///         }
560    ///     }
561    /// }
562    ///
563    /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
564    ///     let key = all_keys();
565    ///     let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
566    ///     let mut ms = vec![];
567    ///     while let Some(row) = reader.next().await? {
568    ///         let user_id = row.column_by_name::<String>("UserId")?;
569    ///         let item_id = row.column_by_name::<i64>("ItemId")?;
570    ///         let quantity = row.column_by_name::<i64>("Quantity")? + 1;
571    ///         let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
572    ///         ms.push(m);
573    ///     }
574    ///     tx.buffer_write(ms);
575    ///     Ok(())
576    /// }
577    /// ```
578    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
579        let session = self.get_session().await?;
580        ReadWriteTransaction::begin(session, ReadWriteTransactionOption::default().begin_options)
581            .await
582            .map_err(|e| e.status.into())
583    }
584
585    /// Get open session count.
586    pub fn session_count(&self) -> usize {
587        self.sessions.num_opened()
588    }
589
590    async fn read_write_transaction_sync_with_option<T, E>(
591        &self,
592        f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
593        options: ReadWriteTransactionOption,
594    ) -> Result<(Option<Timestamp>, T), E>
595    where
596        E: TryAs<Status> + From<SessionError> + From<Status>,
597    {
598        let (bo, co) = Client::split_read_write_transaction_option(options);
599
600        let ro = TransactionRetrySetting::default();
601        let session = Some(self.get_session().await?);
602
603        // reuse session
604        invoke_fn(
605            Some(ro),
606            |session| async {
607                let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
608                let result = f(&mut tx);
609                tx.finish(result, Some(co.clone())).await
610            },
611            session,
612        )
613        .await
614    }
615
616    async fn create_read_write_transaction<E>(
617        &self,
618        session: Option<ManagedSession>,
619        bo: CallOptions,
620    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
621    where
622        E: TryAs<Status> + From<SessionError> + From<Status>,
623    {
624        ReadWriteTransaction::begin(session.unwrap(), bo)
625            .await
626            .map_err(|e| (E::from(e.status), Some(e.session)))
627    }
628
629    async fn get_session(&self) -> Result<ManagedSession, SessionError> {
630        self.sessions.get().await
631    }
632
633    fn split_read_write_transaction_option(options: ReadWriteTransactionOption) -> (CallOptions, CommitOptions) {
634        (options.begin_options, options.commit_options)
635    }
636}