gcloud_spanner/client.rs
1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
21use crate::value::TimestampBound;
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25 pub begin_options: CallOptions,
26 pub query_options: Option<QueryOptions>,
27}
28
29#[derive(Clone)]
30pub struct ReadOnlyTransactionOption {
31 pub timestamp_bound: TimestampBound,
32 pub call_options: CallOptions,
33}
34
35impl Default for ReadOnlyTransactionOption {
36 fn default() -> Self {
37 ReadOnlyTransactionOption {
38 timestamp_bound: TimestampBound::strong_read(),
39 call_options: CallOptions::default(),
40 }
41 }
42}
43
44#[derive(Clone, Default)]
45pub struct ReadWriteTransactionOption {
46 pub begin_options: CallOptions,
47 pub commit_options: CommitOptions,
48}
49
50#[derive(Clone, Debug)]
51pub struct ChannelConfig {
52 /// num_channels is the number of gRPC channels.
53 pub num_channels: usize,
54 pub connect_timeout: Duration,
55 pub timeout: Duration,
56}
57
58impl Default for ChannelConfig {
59 fn default() -> Self {
60 ChannelConfig {
61 num_channels: 4,
62 connect_timeout: Duration::from_secs(30),
63 timeout: Duration::from_secs(30),
64 }
65 }
66}
67
68/// ClientConfig has configurations for the client.
69#[derive(Debug)]
70pub struct ClientConfig {
71 /// SessionPoolConfig is the configuration for session pool.
72 pub session_config: SessionConfig,
73 /// ChannelConfig is the configuration for gRPC connection.
74 pub channel_config: ChannelConfig,
75 /// Overriding service endpoint
76 pub endpoint: String,
77 /// Runtime project
78 pub environment: Environment,
79}
80
81impl Default for ClientConfig {
82 fn default() -> Self {
83 let mut config = ClientConfig {
84 channel_config: Default::default(),
85 session_config: Default::default(),
86 endpoint: SPANNER.to_string(),
87 environment: match var("SPANNER_EMULATOR_HOST").ok() {
88 Some(v) => Environment::Emulator(v),
89 None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
90 },
91 };
92 config.session_config.min_opened = config.channel_config.num_channels * 4;
93 config.session_config.max_opened = config.channel_config.num_channels * 100;
94 config
95 }
96}
97
98#[cfg(feature = "auth")]
99pub use google_cloud_auth;
100use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
101
102#[cfg(feature = "auth")]
103impl ClientConfig {
104 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
105 if let Environment::GoogleCloud(_) = self.environment {
106 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
107 self.environment = Environment::GoogleCloud(Box::new(ts))
108 }
109 Ok(self)
110 }
111
112 pub async fn with_credentials(
113 mut self,
114 credentials: google_cloud_auth::credentials::CredentialsFile,
115 ) -> Result<Self, google_cloud_auth::error::Error> {
116 if let Environment::GoogleCloud(_) = self.environment {
117 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
118 Self::auth_config(),
119 Box::new(credentials),
120 )
121 .await?;
122 self.environment = Environment::GoogleCloud(Box::new(ts))
123 }
124 Ok(self)
125 }
126
127 fn auth_config() -> google_cloud_auth::project::Config<'static> {
128 google_cloud_auth::project::Config::default()
129 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
130 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
131 }
132}
133
134#[derive(thiserror::Error, Debug)]
135pub enum Error {
136 #[error(transparent)]
137 GRPC(#[from] Status),
138
139 #[error(transparent)]
140 InvalidSession(#[from] SessionError),
141
142 #[error(transparent)]
143 ParseError(#[from] crate::row::Error),
144
145 #[error(transparent)]
146 Connection(#[from] google_cloud_gax::conn::Error),
147
148 #[error("invalid config: {0}")]
149 InvalidConfig(String),
150}
151
152impl TryAs<Status> for Error {
153 fn try_as(&self) -> Option<&Status> {
154 match self {
155 Error::GRPC(e) => Some(e),
156 _ => None,
157 }
158 }
159}
160
161/// Client is a client for reading and writing data to a Cloud Spanner database.
162/// A client is safe to use concurrently, except for its Close method.
163#[derive(Clone)]
164pub struct Client {
165 sessions: Arc<SessionManager>,
166}
167
168impl Client {
169 /// new creates a client to a database. A valid database name has
170 /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
171 pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
172 if config.session_config.max_opened > config.channel_config.num_channels * 100 {
173 return Err(Error::InvalidConfig(format!(
174 "max session size is {} because max session size is 100 per gRPC connection",
175 config.channel_config.num_channels * 100
176 )));
177 }
178
179 let pool_size = config.channel_config.num_channels;
180 let options = ConnectionOptions {
181 timeout: Some(config.channel_config.timeout),
182 connect_timeout: Some(config.channel_config.connect_timeout),
183 };
184 let conn_pool =
185 ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
186 let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
187
188 Ok(Client {
189 sessions: session_manager,
190 })
191 }
192
193 /// Close closes all the sessions gracefully.
194 /// This method can be called only once.
195 pub async fn close(self) {
196 self.sessions.close().await;
197 }
198
199 /// single provides a read-only snapshot transaction optimized for the case
200 /// where only a single read or query is needed. This is more efficient than
201 /// using read_only_transaction for a single read or query.
202 /// ```
203 /// use google_cloud_spanner::key::Key;
204 /// use google_cloud_spanner::statement::ToKind;
205 /// use google_cloud_spanner::client::Client;
206 ///
207 /// #[tokio::main]
208 /// async fn run(client: Client) {
209 /// let mut tx = client.single().await.unwrap();
210 /// let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
211 /// Key::new(&"pk1"),
212 /// Key::new(&"pk2")
213 /// ]).await.unwrap();
214 /// }
215 /// ```
216 pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
217 self.single_with_timestamp_bound(TimestampBound::strong_read()).await
218 }
219
220 /// single provides a read-only snapshot transaction optimized for the case
221 /// where only a single read or query is needed. This is more efficient than
222 /// using read_only_transaction for a single read or query.
223 pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
224 let session = self.get_session().await?;
225 let result = ReadOnlyTransaction::single(session, tb).await?;
226 Ok(result)
227 }
228
229 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
230 /// multiple reads from the database.
231 ///
232 /// ```ignore
233 /// use google_cloud_spanner::client::{Client, Error};
234 /// use google_cloud_spanner::statement::Statement;
235 /// use google_cloud_spanner::key::Key;
236 ///
237 /// async fn run(client: Client) -> Result<(), Error>{
238 /// let mut tx = client.read_only_transaction().await?;
239 ///
240 /// let mut stmt = Statement::new("SELECT * , \
241 /// ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
242 /// ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter \
243 /// FROM User \
244 /// WHERE UserId = @Param1");
245 ///
246 /// stmt.add_param("Param1", user_id);
247 /// let mut reader = tx.query(stmt).await?;
248 /// let mut data = vec![];
249 /// while let Some(row) = reader.next().await? {
250 /// let user_id= row.column_by_name::<String>("UserId")?;
251 /// let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
252 /// let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
253 /// data.push(user_id);
254 /// }
255 ///
256 /// let mut reader2 = tx.read("User", &["UserId"], vec![
257 /// Key::new(&"user-1"),
258 /// Key::new(&"user-2")
259 /// ]).await?;
260 ///
261 /// Ok(())
262 /// }
263 pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
264 self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
265 .await
266 }
267
268 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
269 /// multiple reads from the database.
270 pub async fn read_only_transaction_with_option(
271 &self,
272 options: ReadOnlyTransactionOption,
273 ) -> Result<ReadOnlyTransaction, Error> {
274 let session = self.get_session().await?;
275 let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
276 Ok(result)
277 }
278
279 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
280 /// for partitioned reads or queries from a snapshot of the database. This is
281 /// useful in batch processing pipelines where one wants to divide the work of
282 /// reading from the database across multiple machines.
283 pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
284 self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
285 .await
286 }
287
288 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
289 /// for partitioned reads or queries from a snapshot of the database. This is
290 /// useful in batch processing pipelines where one wants to divide the work of
291 /// reading from the database across multiple machines.
292 pub async fn batch_read_only_transaction_with_option(
293 &self,
294 options: ReadOnlyTransactionOption,
295 ) -> Result<BatchReadOnlyTransaction, Error> {
296 let session = self.get_session().await?;
297 let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
298 Ok(result)
299 }
300
301 /// partitioned_update executes a DML statement in parallel across the database,
302 /// using separate, internal transactions that commit independently. The DML
303 /// statement must be fully partitionable: it must be expressible as the union
304 /// of many statements each of which accesses only a single row of the table. The
305 /// statement should also be idempotent, because it may be applied more than once.
306 ///
307 /// PartitionedUpdate returns an estimated count of the number of rows affected.
308 /// The actual number of affected rows may be greater than the estimate.
309 pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
310 self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
311 .await
312 }
313
314 /// partitioned_update executes a DML statement in parallel across the database,
315 /// using separate, internal transactions that commit independently. The DML
316 /// statement must be fully partitionable: it must be expressible as the union
317 /// of many statements each of which accesses only a single row of the table. The
318 /// statement should also be idempotent, because it may be applied more than once.
319 ///
320 /// PartitionedUpdate returns an estimated count of the number of rows affected.
321 /// The actual number of affected rows may be greater than the estimate.
322 pub async fn partitioned_update_with_option(
323 &self,
324 stmt: Statement,
325 options: PartitionedUpdateOption,
326 ) -> Result<i64, Error> {
327 let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
328 let session = Some(self.get_session().await?);
329
330 // reuse session
331 invoke_fn(
332 Some(ro),
333 |session| async {
334 let mut tx =
335 match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
336 .await
337 {
338 Ok(tx) => tx,
339 Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
340 };
341 let qo = options.query_options.clone().unwrap_or_default();
342 tx.update_with_option(stmt.clone(), qo)
343 .await
344 .map_err(|e| (Error::GRPC(e), tx.take_session()))
345 },
346 session,
347 )
348 .await
349 }
350
351 /// apply_at_least_once may attempt to apply mutations more than once; if
352 /// the mutations are not idempotent, this may lead to a failure being reported
353 /// when the mutation was applied more than once. For example, an insert may
354 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
355 /// called. For this reason, most users of the library will prefer not to use
356 /// this option. However, apply_at_least_once requires only a single RPC, whereas
357 /// apply's default replay protection may require an additional RPC. So this
358 /// method may be appropriate for latency sensitive and/or high throughput blind
359 /// writing.
360 pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
361 self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
362 }
363
364 /// apply_at_least_once may attempt to apply mutations more than once; if
365 /// the mutations are not idempotent, this may lead to a failure being reported
366 /// when the mutation was applied more than once. For example, an insert may
367 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
368 /// called. For this reason, most users of the library will prefer not to use
369 /// this option. However, apply_at_least_once requires only a single RPC, whereas
370 /// apply's default replay protection may require an additional RPC. So this
371 /// method may be appropriate for latency sensitive and/or high throughput blind
372 /// writing.
373 pub async fn apply_at_least_once_with_option(
374 &self,
375 ms: Vec<Mutation>,
376 options: CommitOptions,
377 ) -> Result<Option<CommitResult>, Error> {
378 let ro = TransactionRetrySetting::default();
379 let mut session = self.get_session().await?;
380
381 invoke_fn(
382 Some(ro),
383 |session| async {
384 let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
385 exclude_txn_from_change_streams: false,
386 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
387 isolation_level: IsolationLevel::Unspecified as i32,
388 });
389 match commit(session, ms.clone(), tx, options.clone()).await {
390 Ok(s) => Ok(Some(s.into())),
391 Err(e) => Err((Error::GRPC(e), session)),
392 }
393 },
394 &mut session,
395 )
396 .await
397 }
398
399 /// Apply applies a list of mutations atomically to the database.
400 /// ```
401 /// use google_cloud_spanner::mutation::insert;
402 /// use google_cloud_spanner::mutation::delete;
403 /// use google_cloud_spanner::key::all_keys;
404 /// use google_cloud_spanner::statement::ToKind;
405 /// use google_cloud_spanner::client::{Client, Error};
406 /// use google_cloud_spanner::value::CommitTimestamp;
407 ///
408 /// async fn run(client: Client) -> Result<(), Error>{
409 /// let m1 = delete("Guild", all_keys());
410 /// let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
411 /// let commit_timestamp = client.apply(vec![m1,m2]).await?;
412 /// Ok(())
413 /// }
414 /// ```
415 pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
416 self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
417 }
418
419 /// Apply applies a list of mutations atomically to the database.
420 pub async fn apply_with_option(
421 &self,
422 ms: Vec<Mutation>,
423 options: ReadWriteTransactionOption,
424 ) -> Result<CommitResult, Error> {
425 let result: Result<(CommitResult, ()), Error> = self
426 .read_write_transaction_sync_with_option(
427 |tx| {
428 tx.buffer_write(ms.to_vec());
429 Ok(())
430 },
431 options,
432 )
433 .await;
434 Ok(result?.0)
435 }
436
437 /// ReadWriteTransaction executes a read-write transaction, with retries as
438 /// necessary.
439 ///
440 /// The function f will be called one or more times. It must not maintain
441 /// any state between calls.
442 ///
443 /// If the transaction cannot be committed or if f returns an ABORTED error,
444 /// ReadWriteTransaction will call f again. It will continue to call f until the
445 /// transaction can be committed or the Context times out or is cancelled. If f
446 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
447 /// transaction and return the error.
448 ///
449 /// To limit the number of retries, set a deadline on the Context rather than
450 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
451 /// retry as needed until that deadline is met.
452 ///
453 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
454 /// more details.
455 /// ```
456 /// use google_cloud_spanner::mutation::update;
457 /// use google_cloud_spanner::key::{Key, all_keys};
458 /// use google_cloud_spanner::value::Timestamp;
459 /// use google_cloud_spanner::client::Error;
460 /// use google_cloud_spanner::client::Client;
461 ///
462 /// #[tokio::main]
463 /// async fn run(client: Client) -> Result<(Option<Timestamp>,()), Error>{
464 /// client.read_write_transaction(|tx| {
465 /// Box::pin(async move {
466 /// // The transaction function will be called again if the error code
467 /// // of this error is Aborted. The backend may automatically abort
468 /// // any read/write transaction if it detects a deadlock or other problems.
469 /// let key = all_keys();
470 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
471 /// let mut ms = vec![];
472 /// while let Some(row) = reader.next().await? {
473 /// let user_id = row.column_by_name::<String>("UserId")?;
474 /// let item_id = row.column_by_name::<i64>("ItemId")?;
475 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
476 /// let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
477 /// ms.push(m);
478 /// }
479 /// // The buffered mutation will be committed. If the commit
480 /// // fails with an Aborted error, this function will be called again
481 /// tx.buffer_write(ms);
482 /// Ok(())
483 /// })
484 /// }).await
485 /// }
486 pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
487 where
488 E: TryAs<Status> + From<SessionError> + From<Status>,
489 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
490 {
491 self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
492 .await
493 }
494
495 /// ReadWriteTransaction executes a read-write transaction, with retries as
496 /// necessary.
497 ///
498 /// The function f will be called one or more times. It must not maintain
499 /// any state between calls.
500 ///
501 /// If the transaction cannot be committed or if f returns an ABORTED error,
502 /// ReadWriteTransaction will call f again. It will continue to call f until the
503 /// transaction can be committed or the Context times out or is cancelled. If f
504 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
505 /// transaction and return the error.
506 ///
507 /// To limit the number of retries, set a deadline on the Context rather than
508 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
509 /// retry as needed until that deadline is met.
510 ///
511 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
512 /// more details.
513 pub async fn read_write_transaction_with_option<'a, T, E, F>(
514 &'a self,
515 f: F,
516 options: ReadWriteTransactionOption,
517 ) -> Result<(CommitResult, T), E>
518 where
519 E: TryAs<Status> + From<SessionError> + From<Status>,
520 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
521 {
522 let (bo, co) = Client::split_read_write_transaction_option(options);
523
524 let ro = TransactionRetrySetting::default();
525 let session = Some(self.get_session().await?);
526 // must reuse session
527 invoke_fn(
528 Some(ro),
529 |session| async {
530 let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
531 let result = f(&mut tx).await;
532 tx.finish(result, Some(co.clone())).await
533 },
534 session,
535 )
536 .await
537 }
538
539 /// begin_read_write_transaction creates new ReadWriteTransaction.
540 /// ```
541 /// use google_cloud_spanner::mutation::update;
542 /// use google_cloud_spanner::key::{Key, all_keys};
543 /// use google_cloud_spanner::value::Timestamp;
544 /// use google_cloud_spanner::client::Error;
545 /// use google_cloud_spanner::client::Client;
546 /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
547 /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
548 /// use google_cloud_spanner::retry::TransactionRetry;
549 ///
550 /// async fn run(client: Client) -> Result<(), Error>{
551 /// let retry = &mut TransactionRetry::new();
552 /// loop {
553 /// let tx = &mut client.begin_read_write_transaction().await?;
554 ///
555 /// let result = run_in_transaction(tx).await;
556 ///
557 /// // try to commit or rollback transaction.
558 /// match tx.end(result, None).await {
559 /// Ok((_commit_timestamp, success)) => return Ok(success),
560 /// Err(err) => retry.next(err).await? // check retry
561 /// }
562 /// }
563 /// }
564 ///
565 /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
566 /// let key = all_keys();
567 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
568 /// let mut ms = vec![];
569 /// while let Some(row) = reader.next().await? {
570 /// let user_id = row.column_by_name::<String>("UserId")?;
571 /// let item_id = row.column_by_name::<i64>("ItemId")?;
572 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
573 /// let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
574 /// ms.push(m);
575 /// }
576 /// tx.buffer_write(ms);
577 /// Ok(())
578 /// }
579 /// ```
580 pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
581 let session = self.get_session().await?;
582 ReadWriteTransaction::begin(session, ReadWriteTransactionOption::default().begin_options)
583 .await
584 .map_err(|e| e.status.into())
585 }
586
587 /// Get open session count.
588 pub fn session_count(&self) -> usize {
589 self.sessions.num_opened()
590 }
591
592 async fn read_write_transaction_sync_with_option<T, E>(
593 &self,
594 f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
595 options: ReadWriteTransactionOption,
596 ) -> Result<(CommitResult, T), E>
597 where
598 E: TryAs<Status> + From<SessionError> + From<Status>,
599 {
600 let (bo, co) = Client::split_read_write_transaction_option(options);
601
602 let ro = TransactionRetrySetting::default();
603 let session = Some(self.get_session().await?);
604
605 // reuse session
606 invoke_fn(
607 Some(ro),
608 |session| async {
609 let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
610 let result = f(&mut tx);
611 tx.finish(result, Some(co.clone())).await
612 },
613 session,
614 )
615 .await
616 }
617
618 async fn create_read_write_transaction<E>(
619 &self,
620 session: Option<ManagedSession>,
621 bo: CallOptions,
622 ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
623 where
624 E: TryAs<Status> + From<SessionError> + From<Status>,
625 {
626 ReadWriteTransaction::begin(session.unwrap(), bo)
627 .await
628 .map_err(|e| (E::from(e.status), Some(e.session)))
629 }
630
631 async fn get_session(&self) -> Result<ManagedSession, SessionError> {
632 self.sessions.get().await
633 }
634
635 fn split_read_write_transaction_option(options: ReadWriteTransactionOption) -> (CallOptions, CommitOptions) {
636 (options.begin_options, options.commit_options)
637 }
638}