gcloud_spanner/client.rs
1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::metrics::{MetricsConfig, MetricsRecorder};
16use crate::retry::TransactionRetrySetting;
17use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
18use crate::statement::Statement;
19use crate::transaction::{CallOptions, QueryOptions};
20use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
21use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
22use crate::value::TimestampBound;
23
24#[derive(Clone, Default)]
25pub struct PartitionedUpdateOption {
26 pub begin_options: CallOptions,
27 pub query_options: Option<QueryOptions>,
28 /// The transaction tag to use for partitioned update operations.
29 pub transaction_tag: Option<String>,
30}
31
32#[derive(Clone)]
33pub struct ReadOnlyTransactionOption {
34 pub timestamp_bound: TimestampBound,
35 pub call_options: CallOptions,
36}
37
38impl Default for ReadOnlyTransactionOption {
39 fn default() -> Self {
40 ReadOnlyTransactionOption {
41 timestamp_bound: TimestampBound::strong_read(),
42 call_options: CallOptions::default(),
43 }
44 }
45}
46
47#[derive(Clone, Default)]
48pub struct ReadWriteTransactionOption {
49 pub begin_options: CallOptions,
50 pub commit_options: CommitOptions,
51 /// The transaction tag to use for this read/write transaction.
52 pub transaction_tag: Option<String>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ChannelConfig {
57 /// num_channels is the number of gRPC channels.
58 pub num_channels: usize,
59 pub connect_timeout: Duration,
60 pub timeout: Duration,
61}
62
63impl Default for ChannelConfig {
64 fn default() -> Self {
65 ChannelConfig {
66 num_channels: 4,
67 connect_timeout: Duration::from_secs(30),
68 timeout: Duration::from_secs(30),
69 }
70 }
71}
72
73/// ClientConfig has configurations for the client.
74#[derive(Debug)]
75pub struct ClientConfig {
76 /// SessionPoolConfig is the configuration for session pool.
77 pub session_config: SessionConfig,
78 /// ChannelConfig is the configuration for gRPC connection.
79 pub channel_config: ChannelConfig,
80 /// Overriding service endpoint
81 pub endpoint: String,
82 /// Runtime project
83 pub environment: Environment,
84 /// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region.
85 pub disable_route_to_leader: bool,
86 /// Metrics configuration for emitting OpenTelemetry signals.
87 pub metrics: MetricsConfig,
88}
89
90impl Default for ClientConfig {
91 fn default() -> Self {
92 let mut config = ClientConfig {
93 channel_config: Default::default(),
94 session_config: Default::default(),
95 endpoint: SPANNER.to_string(),
96 environment: match var("SPANNER_EMULATOR_HOST").ok() {
97 Some(v) => Environment::Emulator(v),
98 None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
99 },
100 disable_route_to_leader: false,
101 metrics: MetricsConfig::default(),
102 };
103 config.session_config.min_opened = config.channel_config.num_channels * 4;
104 config.session_config.max_opened = config.channel_config.num_channels * 100;
105 config
106 }
107}
108
109#[cfg(feature = "auth")]
110pub use google_cloud_auth;
111use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
112
113#[cfg(feature = "auth")]
114impl ClientConfig {
115 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
116 if let Environment::GoogleCloud(_) = self.environment {
117 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
118 self.environment = Environment::GoogleCloud(Box::new(ts))
119 }
120 Ok(self)
121 }
122
123 pub async fn with_credentials(
124 mut self,
125 credentials: google_cloud_auth::credentials::CredentialsFile,
126 ) -> Result<Self, google_cloud_auth::error::Error> {
127 if let Environment::GoogleCloud(_) = self.environment {
128 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
129 Self::auth_config(),
130 Box::new(credentials),
131 )
132 .await?;
133 self.environment = Environment::GoogleCloud(Box::new(ts))
134 }
135 Ok(self)
136 }
137
138 fn auth_config() -> google_cloud_auth::project::Config<'static> {
139 google_cloud_auth::project::Config::default()
140 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
141 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
142 }
143}
144
145#[derive(thiserror::Error, Debug)]
146pub enum Error {
147 #[error(transparent)]
148 GRPC(#[from] Status),
149
150 #[error(transparent)]
151 InvalidSession(#[from] SessionError),
152
153 #[error(transparent)]
154 ParseError(#[from] crate::row::Error),
155
156 #[error(transparent)]
157 Connection(#[from] google_cloud_gax::conn::Error),
158
159 #[error("invalid config: {0}")]
160 InvalidConfig(String),
161}
162
163impl TryAs<Status> for Error {
164 fn try_as(&self) -> Option<&Status> {
165 match self {
166 Error::GRPC(e) => Some(e),
167 _ => None,
168 }
169 }
170}
171
172/// Client is a client for reading and writing data to a Cloud Spanner database.
173/// A client is safe to use concurrently, except for its Close method.
174#[derive(Clone)]
175pub struct Client {
176 sessions: Arc<SessionManager>,
177 disable_route_to_leader: bool,
178}
179
180impl Client {
181 /// new creates a client to a database. A valid database name has
182 /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
183 pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
184 if config.session_config.max_opened > config.channel_config.num_channels * 100 {
185 return Err(Error::InvalidConfig(format!(
186 "max session size is {} because max session size is 100 per gRPC connection",
187 config.channel_config.num_channels * 100
188 )));
189 }
190
191 let database: String = database.into();
192 let metrics = Arc::new(
193 MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?,
194 );
195
196 let pool_size = config.channel_config.num_channels;
197 let options = ConnectionOptions {
198 timeout: Some(config.channel_config.timeout),
199 connect_timeout: Some(config.channel_config.connect_timeout),
200 };
201 let conn_pool =
202 ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
203 let session_manager = SessionManager::new(
204 database,
205 conn_pool,
206 config.session_config,
207 config.disable_route_to_leader,
208 metrics.clone(),
209 )
210 .await?;
211
212 Ok(Client {
213 sessions: session_manager,
214 disable_route_to_leader: config.disable_route_to_leader,
215 })
216 }
217
218 /// Close closes all the sessions gracefully.
219 /// This method can be called only once.
220 pub async fn close(self) {
221 self.sessions.close().await;
222 }
223
224 /// single provides a read-only snapshot transaction optimized for the case
225 /// where only a single read or query is needed. This is more efficient than
226 /// using read_only_transaction for a single read or query.
227 /// ```
228 /// use google_cloud_spanner::key::Key;
229 /// use google_cloud_spanner::statement::ToKind;
230 /// use google_cloud_spanner::client::Client;
231 ///
232 /// #[tokio::main]
233 /// async fn run(client: Client) {
234 /// let mut tx = client.single().await.unwrap();
235 /// let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
236 /// Key::new(&"pk1"),
237 /// Key::new(&"pk2")
238 /// ]).await.unwrap();
239 /// }
240 /// ```
241 pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
242 self.single_with_timestamp_bound(TimestampBound::strong_read()).await
243 }
244
245 /// single provides a read-only snapshot transaction optimized for the case
246 /// where only a single read or query is needed. This is more efficient than
247 /// using read_only_transaction for a single read or query.
248 pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
249 let session = self.get_session().await?;
250 let result = ReadOnlyTransaction::single(session, tb).await?;
251 Ok(result)
252 }
253
254 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
255 /// multiple reads from the database.
256 ///
257 /// ```ignore
258 /// use google_cloud_spanner::client::{Client, Error};
259 /// use google_cloud_spanner::statement::Statement;
260 /// use google_cloud_spanner::key::Key;
261 ///
262 /// async fn run(client: Client) -> Result<(), Error>{
263 /// let mut tx = client.read_only_transaction().await?;
264 ///
265 /// let mut stmt = Statement::new("SELECT * , \
266 /// ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
267 /// ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter \
268 /// FROM User \
269 /// WHERE UserId = @Param1");
270 ///
271 /// stmt.add_param("Param1", user_id);
272 /// let mut reader = tx.query(stmt).await?;
273 /// let mut data = vec![];
274 /// while let Some(row) = reader.next().await? {
275 /// let user_id= row.column_by_name::<String>("UserId")?;
276 /// let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
277 /// let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
278 /// data.push(user_id);
279 /// }
280 ///
281 /// let mut reader2 = tx.read("User", &["UserId"], vec![
282 /// Key::new(&"user-1"),
283 /// Key::new(&"user-2")
284 /// ]).await?;
285 ///
286 /// Ok(())
287 /// }
288 pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
289 self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
290 .await
291 }
292
293 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
294 /// multiple reads from the database.
295 pub async fn read_only_transaction_with_option(
296 &self,
297 options: ReadOnlyTransactionOption,
298 ) -> Result<ReadOnlyTransaction, Error> {
299 let session = self.get_session().await?;
300 let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
301 Ok(result)
302 }
303
304 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
305 /// for partitioned reads or queries from a snapshot of the database. This is
306 /// useful in batch processing pipelines where one wants to divide the work of
307 /// reading from the database across multiple machines.
308 pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
309 self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
310 .await
311 }
312
313 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
314 /// for partitioned reads or queries from a snapshot of the database. This is
315 /// useful in batch processing pipelines where one wants to divide the work of
316 /// reading from the database across multiple machines.
317 pub async fn batch_read_only_transaction_with_option(
318 &self,
319 options: ReadOnlyTransactionOption,
320 ) -> Result<BatchReadOnlyTransaction, Error> {
321 let session = self.get_session().await?;
322 let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
323 Ok(result)
324 }
325
326 /// partitioned_update executes a DML statement in parallel across the database,
327 /// using separate, internal transactions that commit independently. The DML
328 /// statement must be fully partitionable: it must be expressible as the union
329 /// of many statements each of which accesses only a single row of the table. The
330 /// statement should also be idempotent, because it may be applied more than once.
331 ///
332 /// PartitionedUpdate returns an estimated count of the number of rows affected.
333 /// The actual number of affected rows may be greater than the estimate.
334 pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
335 self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
336 .await
337 }
338
339 /// partitioned_update executes a DML statement in parallel across the database,
340 /// using separate, internal transactions that commit independently. The DML
341 /// statement must be fully partitionable: it must be expressible as the union
342 /// of many statements each of which accesses only a single row of the table. The
343 /// statement should also be idempotent, because it may be applied more than once.
344 ///
345 /// PartitionedUpdate returns an estimated count of the number of rows affected.
346 /// The actual number of affected rows may be greater than the estimate.
347 pub async fn partitioned_update_with_option(
348 &self,
349 stmt: Statement,
350 options: PartitionedUpdateOption,
351 ) -> Result<i64, Error> {
352 let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
353 let session = Some(self.get_session().await?);
354
355 // reuse session
356 invoke_fn(
357 Some(ro),
358 |session| async {
359 let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
360 session.unwrap(),
361 options.begin_options.clone(),
362 options.transaction_tag.clone(),
363 self.disable_route_to_leader,
364 )
365 .await
366 {
367 Ok(tx) => tx,
368 Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
369 };
370 tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
371 .await
372 .map_err(|e| (Error::GRPC(e), tx.take_session()))
373 },
374 session,
375 )
376 .await
377 }
378
379 /// apply_at_least_once may attempt to apply mutations more than once; if
380 /// the mutations are not idempotent, this may lead to a failure being reported
381 /// when the mutation was applied more than once. For example, an insert may
382 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
383 /// called. For this reason, most users of the library will prefer not to use
384 /// this option. However, apply_at_least_once requires only a single RPC, whereas
385 /// apply's default replay protection may require an additional RPC. So this
386 /// method may be appropriate for latency sensitive and/or high throughput blind
387 /// writing.
388 pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
389 self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
390 }
391
392 /// apply_at_least_once may attempt to apply mutations more than once; if
393 /// the mutations are not idempotent, this may lead to a failure being reported
394 /// when the mutation was applied more than once. For example, an insert may
395 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
396 /// called. For this reason, most users of the library will prefer not to use
397 /// this option. However, apply_at_least_once requires only a single RPC, whereas
398 /// apply's default replay protection may require an additional RPC. So this
399 /// method may be appropriate for latency sensitive and/or high throughput blind
400 /// writing.
401 pub async fn apply_at_least_once_with_option(
402 &self,
403 ms: Vec<Mutation>,
404 options: CommitOptions,
405 ) -> Result<Option<CommitResult>, Error> {
406 let ro = TransactionRetrySetting::default();
407 let mut session = self.get_session().await?;
408
409 invoke_fn(
410 Some(ro),
411 |session| async {
412 let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
413 exclude_txn_from_change_streams: false,
414 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
415 isolation_level: IsolationLevel::Unspecified as i32,
416 });
417 match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await {
418 Ok(s) => Ok(Some(s.into())),
419 Err(e) => Err((Error::GRPC(e), session)),
420 }
421 },
422 &mut session,
423 )
424 .await
425 }
426
427 /// Apply applies a list of mutations atomically to the database.
428 /// ```
429 /// use google_cloud_spanner::mutation::insert;
430 /// use google_cloud_spanner::mutation::delete;
431 /// use google_cloud_spanner::key::all_keys;
432 /// use google_cloud_spanner::statement::ToKind;
433 /// use google_cloud_spanner::client::{Client, Error};
434 /// use google_cloud_spanner::value::CommitTimestamp;
435 ///
436 /// async fn run(client: Client) -> Result<(), Error>{
437 /// let m1 = delete("Guild", all_keys());
438 /// let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
439 /// let commit_timestamp = client.apply(vec![m1,m2]).await?;
440 /// Ok(())
441 /// }
442 /// ```
443 pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
444 self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
445 }
446
447 /// Apply applies a list of mutations atomically to the database.
448 pub async fn apply_with_option(
449 &self,
450 ms: Vec<Mutation>,
451 options: ReadWriteTransactionOption,
452 ) -> Result<CommitResult, Error> {
453 let result: Result<(CommitResult, ()), Error> = self
454 .read_write_transaction_sync_with_option(
455 |tx| {
456 tx.buffer_write(ms.to_vec());
457 Ok(())
458 },
459 options,
460 )
461 .await;
462 Ok(result?.0)
463 }
464
465 /// ReadWriteTransaction executes a read-write transaction, with retries as
466 /// necessary.
467 ///
468 /// The function f will be called one or more times. It must not maintain
469 /// any state between calls.
470 ///
471 /// If the transaction cannot be committed or if f returns an ABORTED error,
472 /// ReadWriteTransaction will call f again. It will continue to call f until the
473 /// transaction can be committed or the Context times out or is cancelled. If f
474 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
475 /// transaction and return the error.
476 ///
477 /// To limit the number of retries, set a deadline on the Context rather than
478 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
479 /// retry as needed until that deadline is met.
480 ///
481 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
482 /// more details.
483 /// ```
484 /// use google_cloud_spanner::mutation::update;
485 /// use google_cloud_spanner::key::{Key, all_keys};
486 /// use google_cloud_spanner::value::Timestamp;
487 /// use google_cloud_spanner::client::Error;
488 /// use google_cloud_spanner::client::Client;
489 ///
490 /// #[tokio::main]
491 /// async fn run(client: Client) -> Result<(Option<Timestamp>,()), Error>{
492 /// client.read_write_transaction(|tx| {
493 /// Box::pin(async move {
494 /// // The transaction function will be called again if the error code
495 /// // of this error is Aborted. The backend may automatically abort
496 /// // any read/write transaction if it detects a deadlock or other problems.
497 /// let key = all_keys();
498 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
499 /// let mut ms = vec![];
500 /// while let Some(row) = reader.next().await? {
501 /// let user_id = row.column_by_name::<String>("UserId")?;
502 /// let item_id = row.column_by_name::<i64>("ItemId")?;
503 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
504 /// let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
505 /// ms.push(m);
506 /// }
507 /// // The buffered mutation will be committed. If the commit
508 /// // fails with an Aborted error, this function will be called again
509 /// tx.buffer_write(ms);
510 /// Ok(())
511 /// })
512 /// }).await
513 /// }
514 pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
515 where
516 E: TryAs<Status> + From<SessionError> + From<Status>,
517 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
518 {
519 self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
520 .await
521 }
522
523 /// ReadWriteTransaction executes a read-write transaction, with retries as
524 /// necessary.
525 ///
526 /// The function f will be called one or more times. It must not maintain
527 /// any state between calls.
528 ///
529 /// If the transaction cannot be committed or if f returns an ABORTED error,
530 /// ReadWriteTransaction will call f again. It will continue to call f until the
531 /// transaction can be committed or the Context times out or is cancelled. If f
532 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
533 /// transaction and return the error.
534 ///
535 /// To limit the number of retries, set a deadline on the Context rather than
536 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
537 /// retry as needed until that deadline is met.
538 ///
539 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
540 /// more details.
541 pub async fn read_write_transaction_with_option<'a, T, E, F>(
542 &'a self,
543 f: F,
544 options: ReadWriteTransactionOption,
545 ) -> Result<(CommitResult, T), E>
546 where
547 E: TryAs<Status> + From<SessionError> + From<Status>,
548 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
549 {
550 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
551
552 let ro = TransactionRetrySetting::default();
553 let session = Some(self.get_session().await?);
554 // must reuse session
555 invoke_fn(
556 Some(ro),
557 |session| async {
558 let mut tx = self
559 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
560 .await?;
561 let result = f(&mut tx).await;
562 tx.finish(result, Some(co.clone())).await
563 },
564 session,
565 )
566 .await
567 }
568
569 /// begin_read_write_transaction creates new ReadWriteTransaction.
570 /// ```
571 /// use google_cloud_spanner::mutation::update;
572 /// use google_cloud_spanner::key::{Key, all_keys};
573 /// use google_cloud_spanner::value::Timestamp;
574 /// use google_cloud_spanner::client::Error;
575 /// use google_cloud_spanner::client::Client;
576 /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
577 /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
578 /// use google_cloud_spanner::retry::TransactionRetry;
579 ///
580 /// async fn run(client: Client) -> Result<(), Error>{
581 /// let retry = &mut TransactionRetry::new();
582 /// loop {
583 /// let tx = &mut client.begin_read_write_transaction().await?;
584 ///
585 /// let result = run_in_transaction(tx).await;
586 ///
587 /// // try to commit or rollback transaction.
588 /// match tx.end(result, None).await {
589 /// Ok((_commit_timestamp, success)) => return Ok(success),
590 /// Err(err) => retry.next(err).await? // check retry
591 /// }
592 /// }
593 /// }
594 ///
595 /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
596 /// let key = all_keys();
597 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
598 /// let mut ms = vec![];
599 /// while let Some(row) = reader.next().await? {
600 /// let user_id = row.column_by_name::<String>("UserId")?;
601 /// let item_id = row.column_by_name::<i64>("ItemId")?;
602 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
603 /// let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
604 /// ms.push(m);
605 /// }
606 /// tx.buffer_write(ms);
607 /// Ok(())
608 /// }
609 /// ```
610 pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
611 let session = self.get_session().await?;
612 ReadWriteTransaction::begin(
613 session,
614 ReadWriteTransactionOption::default().begin_options,
615 ReadWriteTransactionOption::default().transaction_tag,
616 self.disable_route_to_leader,
617 )
618 .await
619 .map_err(|e| e.status.into())
620 }
621
622 /// Get open session count.
623 pub fn session_count(&self) -> usize {
624 self.sessions.num_opened()
625 }
626
627 async fn read_write_transaction_sync_with_option<T, E>(
628 &self,
629 f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
630 options: ReadWriteTransactionOption,
631 ) -> Result<(CommitResult, T), E>
632 where
633 E: TryAs<Status> + From<SessionError> + From<Status>,
634 {
635 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
636
637 let ro = TransactionRetrySetting::default();
638 let session = Some(self.get_session().await?);
639
640 // reuse session
641 invoke_fn(
642 Some(ro),
643 |session| async {
644 let mut tx = self
645 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
646 .await?;
647 let result = f(&mut tx);
648 tx.finish(result, Some(co.clone())).await
649 },
650 session,
651 )
652 .await
653 }
654
655 async fn create_read_write_transaction<E>(
656 &self,
657 session: Option<ManagedSession>,
658 bo: CallOptions,
659 transaction_tag: Option<String>,
660 ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
661 where
662 E: TryAs<Status> + From<SessionError> + From<Status>,
663 {
664 ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag, self.disable_route_to_leader)
665 .await
666 .map_err(|e| (E::from(e.status), Some(e.session)))
667 }
668
669 async fn get_session(&self) -> Result<ManagedSession, SessionError> {
670 self.sessions.get().await
671 }
672
673 fn split_read_write_transaction_option(
674 options: ReadWriteTransactionOption,
675 ) -> (CallOptions, CommitOptions, Option<String>) {
676 (options.begin_options, options.commit_options, options.transaction_tag)
677 }
678}