gcloud_spanner/client.rs
1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::retry::TransactionRetrySetting;
16use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions};
19use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
20use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
21use crate::value::TimestampBound;
22
23#[derive(Clone, Default)]
24pub struct PartitionedUpdateOption {
25 pub begin_options: CallOptions,
26 pub query_options: Option<QueryOptions>,
27 /// The transaction tag to use for partitioned update operations.
28 pub transaction_tag: Option<String>,
29}
30
31#[derive(Clone)]
32pub struct ReadOnlyTransactionOption {
33 pub timestamp_bound: TimestampBound,
34 pub call_options: CallOptions,
35}
36
37impl Default for ReadOnlyTransactionOption {
38 fn default() -> Self {
39 ReadOnlyTransactionOption {
40 timestamp_bound: TimestampBound::strong_read(),
41 call_options: CallOptions::default(),
42 }
43 }
44}
45
46#[derive(Clone, Default)]
47pub struct ReadWriteTransactionOption {
48 pub begin_options: CallOptions,
49 pub commit_options: CommitOptions,
50 /// The transaction tag to use for this read/write transaction.
51 pub transaction_tag: Option<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct ChannelConfig {
56 /// num_channels is the number of gRPC channels.
57 pub num_channels: usize,
58 pub connect_timeout: Duration,
59 pub timeout: Duration,
60}
61
62impl Default for ChannelConfig {
63 fn default() -> Self {
64 ChannelConfig {
65 num_channels: 4,
66 connect_timeout: Duration::from_secs(30),
67 timeout: Duration::from_secs(30),
68 }
69 }
70}
71
72/// ClientConfig has configurations for the client.
73#[derive(Debug)]
74pub struct ClientConfig {
75 /// SessionPoolConfig is the configuration for session pool.
76 pub session_config: SessionConfig,
77 /// ChannelConfig is the configuration for gRPC connection.
78 pub channel_config: ChannelConfig,
79 /// Overriding service endpoint
80 pub endpoint: String,
81 /// Runtime project
82 pub environment: Environment,
83}
84
85impl Default for ClientConfig {
86 fn default() -> Self {
87 let mut config = ClientConfig {
88 channel_config: Default::default(),
89 session_config: Default::default(),
90 endpoint: SPANNER.to_string(),
91 environment: match var("SPANNER_EMULATOR_HOST").ok() {
92 Some(v) => Environment::Emulator(v),
93 None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
94 },
95 };
96 config.session_config.min_opened = config.channel_config.num_channels * 4;
97 config.session_config.max_opened = config.channel_config.num_channels * 100;
98 config
99 }
100}
101
102#[cfg(feature = "auth")]
103pub use google_cloud_auth;
104use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
105
106#[cfg(feature = "auth")]
107impl ClientConfig {
108 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
109 if let Environment::GoogleCloud(_) = self.environment {
110 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
111 self.environment = Environment::GoogleCloud(Box::new(ts))
112 }
113 Ok(self)
114 }
115
116 pub async fn with_credentials(
117 mut self,
118 credentials: google_cloud_auth::credentials::CredentialsFile,
119 ) -> Result<Self, google_cloud_auth::error::Error> {
120 if let Environment::GoogleCloud(_) = self.environment {
121 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
122 Self::auth_config(),
123 Box::new(credentials),
124 )
125 .await?;
126 self.environment = Environment::GoogleCloud(Box::new(ts))
127 }
128 Ok(self)
129 }
130
131 fn auth_config() -> google_cloud_auth::project::Config<'static> {
132 google_cloud_auth::project::Config::default()
133 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
134 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
135 }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum Error {
140 #[error(transparent)]
141 GRPC(#[from] Status),
142
143 #[error(transparent)]
144 InvalidSession(#[from] SessionError),
145
146 #[error(transparent)]
147 ParseError(#[from] crate::row::Error),
148
149 #[error(transparent)]
150 Connection(#[from] google_cloud_gax::conn::Error),
151
152 #[error("invalid config: {0}")]
153 InvalidConfig(String),
154}
155
156impl TryAs<Status> for Error {
157 fn try_as(&self) -> Option<&Status> {
158 match self {
159 Error::GRPC(e) => Some(e),
160 _ => None,
161 }
162 }
163}
164
165/// Client is a client for reading and writing data to a Cloud Spanner database.
166/// A client is safe to use concurrently, except for its Close method.
167#[derive(Clone)]
168pub struct Client {
169 sessions: Arc<SessionManager>,
170}
171
172impl Client {
173 /// new creates a client to a database. A valid database name has
174 /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
175 pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
176 if config.session_config.max_opened > config.channel_config.num_channels * 100 {
177 return Err(Error::InvalidConfig(format!(
178 "max session size is {} because max session size is 100 per gRPC connection",
179 config.channel_config.num_channels * 100
180 )));
181 }
182
183 let pool_size = config.channel_config.num_channels;
184 let options = ConnectionOptions {
185 timeout: Some(config.channel_config.timeout),
186 connect_timeout: Some(config.channel_config.connect_timeout),
187 };
188 let conn_pool =
189 ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
190 let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?;
191
192 Ok(Client {
193 sessions: session_manager,
194 })
195 }
196
197 /// Close closes all the sessions gracefully.
198 /// This method can be called only once.
199 pub async fn close(self) {
200 self.sessions.close().await;
201 }
202
203 /// single provides a read-only snapshot transaction optimized for the case
204 /// where only a single read or query is needed. This is more efficient than
205 /// using read_only_transaction for a single read or query.
206 /// ```
207 /// use google_cloud_spanner::key::Key;
208 /// use google_cloud_spanner::statement::ToKind;
209 /// use google_cloud_spanner::client::Client;
210 ///
211 /// #[tokio::main]
212 /// async fn run(client: Client) {
213 /// let mut tx = client.single().await.unwrap();
214 /// let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
215 /// Key::new(&"pk1"),
216 /// Key::new(&"pk2")
217 /// ]).await.unwrap();
218 /// }
219 /// ```
220 pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
221 self.single_with_timestamp_bound(TimestampBound::strong_read()).await
222 }
223
224 /// single provides a read-only snapshot transaction optimized for the case
225 /// where only a single read or query is needed. This is more efficient than
226 /// using read_only_transaction for a single read or query.
227 pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
228 let session = self.get_session().await?;
229 let result = ReadOnlyTransaction::single(session, tb).await?;
230 Ok(result)
231 }
232
233 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
234 /// multiple reads from the database.
235 ///
236 /// ```ignore
237 /// use google_cloud_spanner::client::{Client, Error};
238 /// use google_cloud_spanner::statement::Statement;
239 /// use google_cloud_spanner::key::Key;
240 ///
241 /// async fn run(client: Client) -> Result<(), Error>{
242 /// let mut tx = client.read_only_transaction().await?;
243 ///
244 /// let mut stmt = Statement::new("SELECT * , \
245 /// ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
246 /// ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter \
247 /// FROM User \
248 /// WHERE UserId = @Param1");
249 ///
250 /// stmt.add_param("Param1", user_id);
251 /// let mut reader = tx.query(stmt).await?;
252 /// let mut data = vec![];
253 /// while let Some(row) = reader.next().await? {
254 /// let user_id= row.column_by_name::<String>("UserId")?;
255 /// let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
256 /// let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
257 /// data.push(user_id);
258 /// }
259 ///
260 /// let mut reader2 = tx.read("User", &["UserId"], vec![
261 /// Key::new(&"user-1"),
262 /// Key::new(&"user-2")
263 /// ]).await?;
264 ///
265 /// Ok(())
266 /// }
267 pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
268 self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
269 .await
270 }
271
272 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
273 /// multiple reads from the database.
274 pub async fn read_only_transaction_with_option(
275 &self,
276 options: ReadOnlyTransactionOption,
277 ) -> Result<ReadOnlyTransaction, Error> {
278 let session = self.get_session().await?;
279 let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
280 Ok(result)
281 }
282
283 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
284 /// for partitioned reads or queries from a snapshot of the database. This is
285 /// useful in batch processing pipelines where one wants to divide the work of
286 /// reading from the database across multiple machines.
287 pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
288 self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
289 .await
290 }
291
292 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
293 /// for partitioned reads or queries from a snapshot of the database. This is
294 /// useful in batch processing pipelines where one wants to divide the work of
295 /// reading from the database across multiple machines.
296 pub async fn batch_read_only_transaction_with_option(
297 &self,
298 options: ReadOnlyTransactionOption,
299 ) -> Result<BatchReadOnlyTransaction, Error> {
300 let session = self.get_session().await?;
301 let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
302 Ok(result)
303 }
304
305 /// partitioned_update executes a DML statement in parallel across the database,
306 /// using separate, internal transactions that commit independently. The DML
307 /// statement must be fully partitionable: it must be expressible as the union
308 /// of many statements each of which accesses only a single row of the table. The
309 /// statement should also be idempotent, because it may be applied more than once.
310 ///
311 /// PartitionedUpdate returns an estimated count of the number of rows affected.
312 /// The actual number of affected rows may be greater than the estimate.
313 pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
314 self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
315 .await
316 }
317
318 /// partitioned_update executes a DML statement in parallel across the database,
319 /// using separate, internal transactions that commit independently. The DML
320 /// statement must be fully partitionable: it must be expressible as the union
321 /// of many statements each of which accesses only a single row of the table. The
322 /// statement should also be idempotent, because it may be applied more than once.
323 ///
324 /// PartitionedUpdate returns an estimated count of the number of rows affected.
325 /// The actual number of affected rows may be greater than the estimate.
326 pub async fn partitioned_update_with_option(
327 &self,
328 stmt: Statement,
329 options: PartitionedUpdateOption,
330 ) -> Result<i64, Error> {
331 let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
332 let session = Some(self.get_session().await?);
333
334 // reuse session
335 invoke_fn(
336 Some(ro),
337 |session| async {
338 let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
339 session.unwrap(),
340 options.begin_options.clone(),
341 options.transaction_tag.clone(),
342 )
343 .await
344 {
345 Ok(tx) => tx,
346 Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
347 };
348 tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
349 .await
350 .map_err(|e| (Error::GRPC(e), tx.take_session()))
351 },
352 session,
353 )
354 .await
355 }
356
357 /// apply_at_least_once may attempt to apply mutations more than once; if
358 /// the mutations are not idempotent, this may lead to a failure being reported
359 /// when the mutation was applied more than once. For example, an insert may
360 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
361 /// called. For this reason, most users of the library will prefer not to use
362 /// this option. However, apply_at_least_once requires only a single RPC, whereas
363 /// apply's default replay protection may require an additional RPC. So this
364 /// method may be appropriate for latency sensitive and/or high throughput blind
365 /// writing.
366 pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
367 self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
368 }
369
370 /// apply_at_least_once may attempt to apply mutations more than once; if
371 /// the mutations are not idempotent, this may lead to a failure being reported
372 /// when the mutation was applied more than once. For example, an insert may
373 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
374 /// called. For this reason, most users of the library will prefer not to use
375 /// this option. However, apply_at_least_once requires only a single RPC, whereas
376 /// apply's default replay protection may require an additional RPC. So this
377 /// method may be appropriate for latency sensitive and/or high throughput blind
378 /// writing.
379 pub async fn apply_at_least_once_with_option(
380 &self,
381 ms: Vec<Mutation>,
382 options: CommitOptions,
383 ) -> Result<Option<CommitResult>, Error> {
384 let ro = TransactionRetrySetting::default();
385 let mut session = self.get_session().await?;
386
387 invoke_fn(
388 Some(ro),
389 |session| async {
390 let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
391 exclude_txn_from_change_streams: false,
392 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
393 isolation_level: IsolationLevel::Unspecified as i32,
394 });
395 match commit(session, ms.clone(), tx, options.clone()).await {
396 Ok(s) => Ok(Some(s.into())),
397 Err(e) => Err((Error::GRPC(e), session)),
398 }
399 },
400 &mut session,
401 )
402 .await
403 }
404
405 /// Apply applies a list of mutations atomically to the database.
406 /// ```
407 /// use google_cloud_spanner::mutation::insert;
408 /// use google_cloud_spanner::mutation::delete;
409 /// use google_cloud_spanner::key::all_keys;
410 /// use google_cloud_spanner::statement::ToKind;
411 /// use google_cloud_spanner::client::{Client, Error};
412 /// use google_cloud_spanner::value::CommitTimestamp;
413 ///
414 /// async fn run(client: Client) -> Result<(), Error>{
415 /// let m1 = delete("Guild", all_keys());
416 /// let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
417 /// let commit_timestamp = client.apply(vec![m1,m2]).await?;
418 /// Ok(())
419 /// }
420 /// ```
421 pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
422 self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
423 }
424
425 /// Apply applies a list of mutations atomically to the database.
426 pub async fn apply_with_option(
427 &self,
428 ms: Vec<Mutation>,
429 options: ReadWriteTransactionOption,
430 ) -> Result<CommitResult, Error> {
431 let result: Result<(CommitResult, ()), Error> = self
432 .read_write_transaction_sync_with_option(
433 |tx| {
434 tx.buffer_write(ms.to_vec());
435 Ok(())
436 },
437 options,
438 )
439 .await;
440 Ok(result?.0)
441 }
442
443 /// ReadWriteTransaction executes a read-write transaction, with retries as
444 /// necessary.
445 ///
446 /// The function f will be called one or more times. It must not maintain
447 /// any state between calls.
448 ///
449 /// If the transaction cannot be committed or if f returns an ABORTED error,
450 /// ReadWriteTransaction will call f again. It will continue to call f until the
451 /// transaction can be committed or the Context times out or is cancelled. If f
452 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
453 /// transaction and return the error.
454 ///
455 /// To limit the number of retries, set a deadline on the Context rather than
456 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
457 /// retry as needed until that deadline is met.
458 ///
459 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
460 /// more details.
461 /// ```
462 /// use google_cloud_spanner::mutation::update;
463 /// use google_cloud_spanner::key::{Key, all_keys};
464 /// use google_cloud_spanner::value::Timestamp;
465 /// use google_cloud_spanner::client::Error;
466 /// use google_cloud_spanner::client::Client;
467 ///
468 /// #[tokio::main]
469 /// async fn run(client: Client) -> Result<(Option<Timestamp>,()), Error>{
470 /// client.read_write_transaction(|tx| {
471 /// Box::pin(async move {
472 /// // The transaction function will be called again if the error code
473 /// // of this error is Aborted. The backend may automatically abort
474 /// // any read/write transaction if it detects a deadlock or other problems.
475 /// let key = all_keys();
476 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
477 /// let mut ms = vec![];
478 /// while let Some(row) = reader.next().await? {
479 /// let user_id = row.column_by_name::<String>("UserId")?;
480 /// let item_id = row.column_by_name::<i64>("ItemId")?;
481 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
482 /// let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
483 /// ms.push(m);
484 /// }
485 /// // The buffered mutation will be committed. If the commit
486 /// // fails with an Aborted error, this function will be called again
487 /// tx.buffer_write(ms);
488 /// Ok(())
489 /// })
490 /// }).await
491 /// }
492 pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
493 where
494 E: TryAs<Status> + From<SessionError> + From<Status>,
495 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
496 {
497 self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
498 .await
499 }
500
501 /// ReadWriteTransaction executes a read-write transaction, with retries as
502 /// necessary.
503 ///
504 /// The function f will be called one or more times. It must not maintain
505 /// any state between calls.
506 ///
507 /// If the transaction cannot be committed or if f returns an ABORTED error,
508 /// ReadWriteTransaction will call f again. It will continue to call f until the
509 /// transaction can be committed or the Context times out or is cancelled. If f
510 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
511 /// transaction and return the error.
512 ///
513 /// To limit the number of retries, set a deadline on the Context rather than
514 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
515 /// retry as needed until that deadline is met.
516 ///
517 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
518 /// more details.
519 pub async fn read_write_transaction_with_option<'a, T, E, F>(
520 &'a self,
521 f: F,
522 options: ReadWriteTransactionOption,
523 ) -> Result<(CommitResult, T), E>
524 where
525 E: TryAs<Status> + From<SessionError> + From<Status>,
526 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
527 {
528 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
529
530 let ro = TransactionRetrySetting::default();
531 let session = Some(self.get_session().await?);
532 // must reuse session
533 invoke_fn(
534 Some(ro),
535 |session| async {
536 let mut tx = self
537 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
538 .await?;
539 let result = f(&mut tx).await;
540 tx.finish(result, Some(co.clone())).await
541 },
542 session,
543 )
544 .await
545 }
546
547 /// begin_read_write_transaction creates new ReadWriteTransaction.
548 /// ```
549 /// use google_cloud_spanner::mutation::update;
550 /// use google_cloud_spanner::key::{Key, all_keys};
551 /// use google_cloud_spanner::value::Timestamp;
552 /// use google_cloud_spanner::client::Error;
553 /// use google_cloud_spanner::client::Client;
554 /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
555 /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
556 /// use google_cloud_spanner::retry::TransactionRetry;
557 ///
558 /// async fn run(client: Client) -> Result<(), Error>{
559 /// let retry = &mut TransactionRetry::new();
560 /// loop {
561 /// let tx = &mut client.begin_read_write_transaction().await?;
562 ///
563 /// let result = run_in_transaction(tx).await;
564 ///
565 /// // try to commit or rollback transaction.
566 /// match tx.end(result, None).await {
567 /// Ok((_commit_timestamp, success)) => return Ok(success),
568 /// Err(err) => retry.next(err).await? // check retry
569 /// }
570 /// }
571 /// }
572 ///
573 /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
574 /// let key = all_keys();
575 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
576 /// let mut ms = vec![];
577 /// while let Some(row) = reader.next().await? {
578 /// let user_id = row.column_by_name::<String>("UserId")?;
579 /// let item_id = row.column_by_name::<i64>("ItemId")?;
580 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
581 /// let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
582 /// ms.push(m);
583 /// }
584 /// tx.buffer_write(ms);
585 /// Ok(())
586 /// }
587 /// ```
588 pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
589 let session = self.get_session().await?;
590 ReadWriteTransaction::begin(
591 session,
592 ReadWriteTransactionOption::default().begin_options,
593 ReadWriteTransactionOption::default().transaction_tag,
594 )
595 .await
596 .map_err(|e| e.status.into())
597 }
598
599 /// Get open session count.
600 pub fn session_count(&self) -> usize {
601 self.sessions.num_opened()
602 }
603
604 async fn read_write_transaction_sync_with_option<T, E>(
605 &self,
606 f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
607 options: ReadWriteTransactionOption,
608 ) -> Result<(CommitResult, T), E>
609 where
610 E: TryAs<Status> + From<SessionError> + From<Status>,
611 {
612 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
613
614 let ro = TransactionRetrySetting::default();
615 let session = Some(self.get_session().await?);
616
617 // reuse session
618 invoke_fn(
619 Some(ro),
620 |session| async {
621 let mut tx = self
622 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
623 .await?;
624 let result = f(&mut tx);
625 tx.finish(result, Some(co.clone())).await
626 },
627 session,
628 )
629 .await
630 }
631
632 async fn create_read_write_transaction<E>(
633 &self,
634 session: Option<ManagedSession>,
635 bo: CallOptions,
636 transaction_tag: Option<String>,
637 ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
638 where
639 E: TryAs<Status> + From<SessionError> + From<Status>,
640 {
641 ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag)
642 .await
643 .map_err(|e| (E::from(e.status), Some(e.session)))
644 }
645
646 async fn get_session(&self) -> Result<ManagedSession, SessionError> {
647 self.sessions.get().await
648 }
649
650 fn split_read_write_transaction_option(
651 options: ReadWriteTransactionOption,
652 ) -> (CallOptions, CommitOptions, Option<String>) {
653 (options.begin_options, options.commit_options, options.transaction_tag)
654 }
655}