google_cloud_spanner/client.rs
1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use google_cloud_token::NopeTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, ReadWriteTransaction};
21use crate::value::{Timestamp, TimestampBound};
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25 pub begin_options: CallOptions,
26 pub query_options: Option<QueryOptions>,
27}
28
29#[derive(Clone)]
30pub struct ReadOnlyTransactionOption {
31 pub timestamp_bound: TimestampBound,
32 pub call_options: CallOptions,
33}
34
35impl Default for ReadOnlyTransactionOption {
36 fn default() -> Self {
37 ReadOnlyTransactionOption {
38 timestamp_bound: TimestampBound::strong_read(),
39 call_options: CallOptions::default(),
40 }
41 }
42}
43
44#[derive(Clone, Default)]
45pub struct ReadWriteTransactionOption {
46 pub begin_options: CallOptions,
47 pub commit_options: CommitOptions,
48}
49
50#[derive(Clone, Debug)]
51pub struct ChannelConfig {
52 /// num_channels is the number of gRPC channels.
53 pub num_channels: usize,
54 pub connect_timeout: Duration,
55 pub timeout: Duration,
56}
57
58impl Default for ChannelConfig {
59 fn default() -> Self {
60 ChannelConfig {
61 num_channels: 4,
62 connect_timeout: Duration::from_secs(30),
63 timeout: Duration::from_secs(30),
64 }
65 }
66}
67
68/// ClientConfig has configurations for the client.
69#[derive(Debug)]
70pub struct ClientConfig {
71 /// SessionPoolConfig is the configuration for session pool.
72 pub session_config: SessionConfig,
73 /// ChannelConfig is the configuration for gRPC connection.
74 pub channel_config: ChannelConfig,
75 /// Overriding service endpoint
76 pub endpoint: String,
77 /// Runtime project
78 pub environment: Environment,
79}
80
81impl Default for ClientConfig {
82 fn default() -> Self {
83 let mut config = ClientConfig {
84 channel_config: Default::default(),
85 session_config: Default::default(),
86 endpoint: SPANNER.to_string(),
87 environment: match var("SPANNER_EMULATOR_HOST").ok() {
88 Some(v) => Environment::Emulator(v),
89 None => Environment::GoogleCloud(Box::new(NopeTokenSourceProvider {})),
90 },
91 };
92 config.session_config.min_opened = config.channel_config.num_channels * 4;
93 config.session_config.max_opened = config.channel_config.num_channels * 100;
94 config
95 }
96}
97
98#[cfg(feature = "auth")]
99pub use google_cloud_auth;
100
101#[cfg(feature = "auth")]
102impl ClientConfig {
103 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
104 if let Environment::GoogleCloud(_) = self.environment {
105 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
106 self.environment = Environment::GoogleCloud(Box::new(ts))
107 }
108 Ok(self)
109 }
110
111 pub async fn with_credentials(
112 mut self,
113 credentials: google_cloud_auth::credentials::CredentialsFile,
114 ) -> Result<Self, google_cloud_auth::error::Error> {
115 if let Environment::GoogleCloud(_) = self.environment {
116 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
117 Self::auth_config(),
118 Box::new(credentials),
119 )
120 .await?;
121 self.environment = Environment::GoogleCloud(Box::new(ts))
122 }
123 Ok(self)
124 }
125
126 fn auth_config() -> google_cloud_auth::project::Config<'static> {
127 google_cloud_auth::project::Config::default()
128 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
129 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
130 }
131}
132
133#[derive(thiserror::Error, Debug)]
134pub enum Error {
135 #[error(transparent)]
136 GRPC(#[from] Status),
137
138 #[error(transparent)]
139 InvalidSession(#[from] SessionError),
140
141 #[error(transparent)]
142 ParseError(#[from] crate::row::Error),
143
144 #[error(transparent)]
145 Connection(#[from] google_cloud_gax::conn::Error),
146
147 #[error("invalid config: {0}")]
148 InvalidConfig(String),
149}
150
151impl TryAs<Status> for Error {
152 fn try_as(&self) -> Option<&Status> {
153 match self {
154 Error::GRPC(e) => Some(e),
155 _ => None,
156 }
157 }
158}
159
160/// Client is a client for reading and writing data to a Cloud Spanner database.
161/// A client is safe to use concurrently, except for its Close method.
162#[derive(Clone)]
163pub struct Client {
164 sessions: Arc<SessionManager>,
165}
166
167impl Client {
168 /// new creates a client to a database. A valid database name has
169 /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
170 pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
171 if config.session_config.max_opened > config.channel_config.num_channels * 100 {
172 return Err(Error::InvalidConfig(format!(
173 "max session size is {} because max session size is 100 per gRPC connection",
174 config.channel_config.num_channels * 100
175 )));
176 }
177
178 let pool_size = config.channel_config.num_channels;
179 let options = ConnectionOptions {
180 timeout: Some(config.channel_config.timeout),
181 connect_timeout: Some(config.channel_config.connect_timeout),
182 };
183 let conn_pool =
184 ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
185 let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
186
187 Ok(Client {
188 sessions: session_manager,
189 })
190 }
191
192 /// Close closes all the sessions gracefully.
193 /// This method can be called only once.
194 pub async fn close(self) {
195 self.sessions.close().await;
196 }
197
198 /// single provides a read-only snapshot transaction optimized for the case
199 /// where only a single read or query is needed. This is more efficient than
200 /// using read_only_transaction for a single read or query.
201 /// ```
202 /// use google_cloud_spanner::key::Key;
203 /// use google_cloud_spanner::statement::ToKind;
204 /// use google_cloud_spanner::client::Client;
205 ///
206 /// #[tokio::main]
207 /// async fn run(client: Client) {
208 /// let mut tx = client.single().await.unwrap();
209 /// let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
210 /// Key::new(&"pk1"),
211 /// Key::new(&"pk2")
212 /// ]).await.unwrap();
213 /// }
214 /// ```
215 pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
216 self.single_with_timestamp_bound(TimestampBound::strong_read()).await
217 }
218
219 /// single provides a read-only snapshot transaction optimized for the case
220 /// where only a single read or query is needed. This is more efficient than
221 /// using read_only_transaction for a single read or query.
222 pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
223 let session = self.get_session().await?;
224 let result = ReadOnlyTransaction::single(session, tb).await?;
225 Ok(result)
226 }
227
228 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
229 /// multiple reads from the database.
230 ///
231 /// ```ignore
232 /// use google_cloud_spanner::client::{Client, Error};
233 /// use google_cloud_spanner::statement::Statement;
234 /// use google_cloud_spanner::key::Key;
235 ///
236 /// async fn run(client: Client) -> Result<(), Error>{
237 /// let mut tx = client.read_only_transaction().await?;
238 ///
239 /// let mut stmt = Statement::new("SELECT * , \
240 /// ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
241 /// ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter \
242 /// FROM User \
243 /// WHERE UserId = @Param1");
244 ///
245 /// stmt.add_param("Param1", user_id);
246 /// let mut reader = tx.query(stmt).await?;
247 /// let mut data = vec![];
248 /// while let Some(row) = reader.next().await? {
249 /// let user_id= row.column_by_name::<String>("UserId")?;
250 /// let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
251 /// let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
252 /// data.push(user_id);
253 /// }
254 ///
255 /// let mut reader2 = tx.read("User", &["UserId"], vec![
256 /// Key::new(&"user-1"),
257 /// Key::new(&"user-2")
258 /// ]).await?;
259 ///
260 /// Ok(())
261 /// }
262 pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
263 self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
264 .await
265 }
266
267 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
268 /// multiple reads from the database.
269 pub async fn read_only_transaction_with_option(
270 &self,
271 options: ReadOnlyTransactionOption,
272 ) -> Result<ReadOnlyTransaction, Error> {
273 let session = self.get_session().await?;
274 let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
275 Ok(result)
276 }
277
278 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
279 /// for partitioned reads or queries from a snapshot of the database. This is
280 /// useful in batch processing pipelines where one wants to divide the work of
281 /// reading from the database across multiple machines.
282 pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
283 self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
284 .await
285 }
286
287 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
288 /// for partitioned reads or queries from a snapshot of the database. This is
289 /// useful in batch processing pipelines where one wants to divide the work of
290 /// reading from the database across multiple machines.
291 pub async fn batch_read_only_transaction_with_option(
292 &self,
293 options: ReadOnlyTransactionOption,
294 ) -> Result<BatchReadOnlyTransaction, Error> {
295 let session = self.get_session().await?;
296 let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
297 Ok(result)
298 }
299
300 /// partitioned_update executes a DML statement in parallel across the database,
301 /// using separate, internal transactions that commit independently. The DML
302 /// statement must be fully partitionable: it must be expressible as the union
303 /// of many statements each of which accesses only a single row of the table. The
304 /// statement should also be idempotent, because it may be applied more than once.
305 ///
306 /// PartitionedUpdate returns an estimated count of the number of rows affected.
307 /// The actual number of affected rows may be greater than the estimate.
308 pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
309 self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
310 .await
311 }
312
313 /// partitioned_update executes a DML statement in parallel across the database,
314 /// using separate, internal transactions that commit independently. The DML
315 /// statement must be fully partitionable: it must be expressible as the union
316 /// of many statements each of which accesses only a single row of the table. The
317 /// statement should also be idempotent, because it may be applied more than once.
318 ///
319 /// PartitionedUpdate returns an estimated count of the number of rows affected.
320 /// The actual number of affected rows may be greater than the estimate.
321 pub async fn partitioned_update_with_option(
322 &self,
323 stmt: Statement,
324 options: PartitionedUpdateOption,
325 ) -> Result<i64, Error> {
326 let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
327 let session = Some(self.get_session().await?);
328
329 // reuse session
330 invoke_fn(
331 Some(ro),
332 |session| async {
333 let mut tx =
334 match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
335 .await
336 {
337 Ok(tx) => tx,
338 Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
339 };
340 let qo = options.query_options.clone().unwrap_or_default();
341 tx.update_with_option(stmt.clone(), qo)
342 .await
343 .map_err(|e| (Error::GRPC(e), tx.take_session()))
344 },
345 session,
346 )
347 .await
348 }
349
350 /// apply_at_least_once may attempt to apply mutations more than once; if
351 /// the mutations are not idempotent, this may lead to a failure being reported
352 /// when the mutation was applied more than once. For example, an insert may
353 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
354 /// called. For this reason, most users of the library will prefer not to use
355 /// this option. However, apply_at_least_once requires only a single RPC, whereas
356 /// apply's default replay protection may require an additional RPC. So this
357 /// method may be appropriate for latency sensitive and/or high throughput blind
358 /// writing.
359 pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<Timestamp>, Error> {
360 self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
361 }
362
363 /// apply_at_least_once may attempt to apply mutations more than once; if
364 /// the mutations are not idempotent, this may lead to a failure being reported
365 /// when the mutation was applied more than once. For example, an insert may
366 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
367 /// called. For this reason, most users of the library will prefer not to use
368 /// this option. However, apply_at_least_once requires only a single RPC, whereas
369 /// apply's default replay protection may require an additional RPC. So this
370 /// method may be appropriate for latency sensitive and/or high throughput blind
371 /// writing.
372 pub async fn apply_at_least_once_with_option(
373 &self,
374 ms: Vec<Mutation>,
375 options: CommitOptions,
376 ) -> Result<Option<Timestamp>, Error> {
377 let ro = TransactionRetrySetting::default();
378 let mut session = self.get_session().await?;
379
380 invoke_fn(
381 Some(ro),
382 |session| async {
383 let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
384 exclude_txn_from_change_streams: false,
385 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
386 });
387 match commit(session, ms.clone(), tx, options.clone()).await {
388 Ok(s) => Ok(s.commit_timestamp.map(|s| s.into())),
389 Err(e) => Err((Error::GRPC(e), session)),
390 }
391 },
392 &mut session,
393 )
394 .await
395 }
396
397 /// Apply applies a list of mutations atomically to the database.
398 /// ```
399 /// use google_cloud_spanner::mutation::insert;
400 /// use google_cloud_spanner::mutation::delete;
401 /// use google_cloud_spanner::key::all_keys;
402 /// use google_cloud_spanner::statement::ToKind;
403 /// use google_cloud_spanner::client::{Client, Error};
404 /// use google_cloud_spanner::value::CommitTimestamp;
405 ///
406 /// async fn run(client: Client) -> Result<(), Error>{
407 /// let m1 = delete("Guild", all_keys());
408 /// let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
409 /// let commit_timestamp = client.apply(vec![m1,m2]).await?;
410 /// Ok(())
411 /// }
412 /// ```
413 pub async fn apply(&self, ms: Vec<Mutation>) -> Result<Option<Timestamp>, Error> {
414 self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
415 }
416
417 /// Apply applies a list of mutations atomically to the database.
418 pub async fn apply_with_option(
419 &self,
420 ms: Vec<Mutation>,
421 options: ReadWriteTransactionOption,
422 ) -> Result<Option<Timestamp>, Error> {
423 let result: Result<(Option<Timestamp>, ()), Error> = self
424 .read_write_transaction_sync_with_option(
425 |tx| {
426 tx.buffer_write(ms.to_vec());
427 Ok(())
428 },
429 options,
430 )
431 .await;
432 Ok(result?.0)
433 }
434
435 /// ReadWriteTransaction executes a read-write transaction, with retries as
436 /// necessary.
437 ///
438 /// The function f will be called one or more times. It must not maintain
439 /// any state between calls.
440 ///
441 /// If the transaction cannot be committed or if f returns an ABORTED error,
442 /// ReadWriteTransaction will call f again. It will continue to call f until the
443 /// transaction can be committed or the Context times out or is cancelled. If f
444 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
445 /// transaction and return the error.
446 ///
447 /// To limit the number of retries, set a deadline on the Context rather than
448 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
449 /// retry as needed until that deadline is met.
450 ///
451 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
452 /// more details.
453 /// ```
454 /// use google_cloud_spanner::mutation::update;
455 /// use google_cloud_spanner::key::{Key, all_keys};
456 /// use google_cloud_spanner::value::Timestamp;
457 /// use google_cloud_spanner::client::Error;
458 /// use google_cloud_spanner::client::Client;
459 ///
460 /// #[tokio::main]
461 /// async fn run(client: Client) -> Result<(Option<Timestamp>,()), Error>{
462 /// client.read_write_transaction(|tx| {
463 /// Box::pin(async move {
464 /// // The transaction function will be called again if the error code
465 /// // of this error is Aborted. The backend may automatically abort
466 /// // any read/write transaction if it detects a deadlock or other problems.
467 /// let key = all_keys();
468 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
469 /// let mut ms = vec![];
470 /// while let Some(row) = reader.next().await? {
471 /// let user_id = row.column_by_name::<String>("UserId")?;
472 /// let item_id = row.column_by_name::<i64>("ItemId")?;
473 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
474 /// let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
475 /// ms.push(m);
476 /// }
477 /// // The buffered mutation will be committed. If the commit
478 /// // fails with an Aborted error, this function will be called again
479 /// tx.buffer_write(ms);
480 /// Ok(())
481 /// })
482 /// }).await
483 /// }
484 pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(Option<Timestamp>, T), E>
485 where
486 E: TryAs<Status> + From<SessionError> + From<Status>,
487 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
488 {
489 self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
490 .await
491 }
492
493 /// ReadWriteTransaction executes a read-write transaction, with retries as
494 /// necessary.
495 ///
496 /// The function f will be called one or more times. It must not maintain
497 /// any state between calls.
498 ///
499 /// If the transaction cannot be committed or if f returns an ABORTED error,
500 /// ReadWriteTransaction will call f again. It will continue to call f until the
501 /// transaction can be committed or the Context times out or is cancelled. If f
502 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
503 /// transaction and return the error.
504 ///
505 /// To limit the number of retries, set a deadline on the Context rather than
506 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
507 /// retry as needed until that deadline is met.
508 ///
509 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
510 /// more details.
511 pub async fn read_write_transaction_with_option<'a, T, E, F>(
512 &'a self,
513 f: F,
514 options: ReadWriteTransactionOption,
515 ) -> Result<(Option<Timestamp>, T), E>
516 where
517 E: TryAs<Status> + From<SessionError> + From<Status>,
518 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
519 {
520 let (bo, co) = Client::split_read_write_transaction_option(options);
521
522 let ro = TransactionRetrySetting::default();
523 let session = Some(self.get_session().await?);
524 // must reuse session
525 invoke_fn(
526 Some(ro),
527 |session| async {
528 let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
529 let result = f(&mut tx).await;
530 tx.finish(result, Some(co.clone())).await
531 },
532 session,
533 )
534 .await
535 }
536
537 /// begin_read_write_transaction creates new ReadWriteTransaction.
538 /// ```
539 /// use google_cloud_spanner::mutation::update;
540 /// use google_cloud_spanner::key::{Key, all_keys};
541 /// use google_cloud_spanner::value::Timestamp;
542 /// use google_cloud_spanner::client::Error;
543 /// use google_cloud_spanner::client::Client;
544 /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
545 /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
546 /// use google_cloud_spanner::retry::TransactionRetry;
547 ///
548 /// async fn run(client: Client) -> Result<(), Error>{
549 /// let retry = &mut TransactionRetry::new();
550 /// loop {
551 /// let tx = &mut client.begin_read_write_transaction().await?;
552 ///
553 /// let result = run_in_transaction(tx).await;
554 ///
555 /// // try to commit or rollback transaction.
556 /// match tx.end(result, None).await {
557 /// Ok((_commit_timestamp, success)) => return Ok(success),
558 /// Err(err) => retry.next(err).await? // check retry
559 /// }
560 /// }
561 /// }
562 ///
563 /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
564 /// let key = all_keys();
565 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
566 /// let mut ms = vec![];
567 /// while let Some(row) = reader.next().await? {
568 /// let user_id = row.column_by_name::<String>("UserId")?;
569 /// let item_id = row.column_by_name::<i64>("ItemId")?;
570 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
571 /// let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
572 /// ms.push(m);
573 /// }
574 /// tx.buffer_write(ms);
575 /// Ok(())
576 /// }
577 /// ```
578 pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
579 let session = self.get_session().await?;
580 ReadWriteTransaction::begin(session, ReadWriteTransactionOption::default().begin_options)
581 .await
582 .map_err(|e| e.status.into())
583 }
584
585 /// Get open session count.
586 pub fn session_count(&self) -> usize {
587 self.sessions.num_opened()
588 }
589
590 async fn read_write_transaction_sync_with_option<T, E>(
591 &self,
592 f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
593 options: ReadWriteTransactionOption,
594 ) -> Result<(Option<Timestamp>, T), E>
595 where
596 E: TryAs<Status> + From<SessionError> + From<Status>,
597 {
598 let (bo, co) = Client::split_read_write_transaction_option(options);
599
600 let ro = TransactionRetrySetting::default();
601 let session = Some(self.get_session().await?);
602
603 // reuse session
604 invoke_fn(
605 Some(ro),
606 |session| async {
607 let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
608 let result = f(&mut tx);
609 tx.finish(result, Some(co.clone())).await
610 },
611 session,
612 )
613 .await
614 }
615
616 async fn create_read_write_transaction<E>(
617 &self,
618 session: Option<ManagedSession>,
619 bo: CallOptions,
620 ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
621 where
622 E: TryAs<Status> + From<SessionError> + From<Status>,
623 {
624 ReadWriteTransaction::begin(session.unwrap(), bo)
625 .await
626 .map_err(|e| (E::from(e.status), Some(e.session)))
627 }
628
629 async fn get_session(&self) -> Result<ManagedSession, SessionError> {
630 self.sessions.get().await
631 }
632
633 fn split_read_write_transaction_option(options: ReadWriteTransactionOption) -> (CallOptions, CommitOptions) {
634 (options.begin_options, options.commit_options)
635 }
636}