gcloud_spanner/client.rs
1use std::env::var;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use google_cloud_gax::conn::{ConnectionOptions, Environment};
9use google_cloud_gax::grpc::{Code, Status};
10use google_cloud_gax::retry::{invoke_fn, TryAs};
11use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, Mutation, TransactionOptions};
12use token_source::NoopTokenSourceProvider;
13
14use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
15use crate::metrics::{MetricsConfig, MetricsRecorder};
16use crate::retry::TransactionRetrySetting;
17use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
18use crate::statement::Statement;
19use crate::transaction::{CallOptions, QueryOptions};
20use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
21use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
22use crate::value::TimestampBound;
23
24#[derive(Clone, Default)]
25pub struct PartitionedUpdateOption {
26 pub begin_options: CallOptions,
27 pub query_options: Option<QueryOptions>,
28 /// The transaction tag to use for partitioned update operations.
29 pub transaction_tag: Option<String>,
30}
31
32#[derive(Clone)]
33pub struct ReadOnlyTransactionOption {
34 pub timestamp_bound: TimestampBound,
35 pub call_options: CallOptions,
36}
37
38impl Default for ReadOnlyTransactionOption {
39 fn default() -> Self {
40 ReadOnlyTransactionOption {
41 timestamp_bound: TimestampBound::strong_read(),
42 call_options: CallOptions::default(),
43 }
44 }
45}
46
47#[derive(Clone, Default)]
48pub struct ReadWriteTransactionOption {
49 pub begin_options: CallOptions,
50 pub commit_options: CommitOptions,
51 /// The transaction tag to use for this read/write transaction.
52 pub transaction_tag: Option<String>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ChannelConfig {
57 /// num_channels is the number of gRPC channels.
58 pub num_channels: usize,
59 pub connect_timeout: Duration,
60 pub timeout: Duration,
61 pub http2_keep_alive_interval: Option<Duration>,
62 pub keep_alive_timeout: Option<Duration>,
63 pub keep_alive_while_idle: Option<bool>,
64}
65
66impl Default for ChannelConfig {
67 fn default() -> Self {
68 ChannelConfig {
69 num_channels: 4,
70 connect_timeout: Duration::from_secs(30),
71 timeout: Duration::from_secs(30),
72 http2_keep_alive_interval: None,
73 keep_alive_timeout: None,
74 keep_alive_while_idle: None,
75 }
76 }
77}
78
79/// ClientConfig has configurations for the client.
80#[derive(Debug)]
81pub struct ClientConfig {
82 /// SessionPoolConfig is the configuration for session pool.
83 pub session_config: SessionConfig,
84 /// ChannelConfig is the configuration for gRPC connection.
85 pub channel_config: ChannelConfig,
86 /// Overriding service endpoint
87 pub endpoint: String,
88 /// Runtime project
89 pub environment: Environment,
90 /// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region.
91 pub disable_route_to_leader: bool,
92 /// Metrics configuration for emitting OpenTelemetry signals.
93 pub metrics: MetricsConfig,
94}
95
96impl Default for ClientConfig {
97 fn default() -> Self {
98 let mut config = ClientConfig {
99 channel_config: Default::default(),
100 session_config: Default::default(),
101 endpoint: SPANNER.to_string(),
102 environment: match var("SPANNER_EMULATOR_HOST").ok() {
103 Some(v) => Environment::Emulator(v),
104 None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
105 },
106 disable_route_to_leader: false,
107 metrics: MetricsConfig::default(),
108 };
109 config.session_config.min_opened = config.channel_config.num_channels * 4;
110 config.session_config.max_opened = config.channel_config.num_channels * 100;
111 config
112 }
113}
114
115#[cfg(feature = "auth")]
116pub use google_cloud_auth;
117use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
118
119#[cfg(feature = "auth")]
120impl ClientConfig {
121 pub async fn with_auth(mut self) -> Result<Self, google_cloud_auth::error::Error> {
122 if let Environment::GoogleCloud(_) = self.environment {
123 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::auth_config()).await?;
124 self.environment = Environment::GoogleCloud(Box::new(ts))
125 }
126 Ok(self)
127 }
128
129 pub async fn with_credentials(
130 mut self,
131 credentials: google_cloud_auth::credentials::CredentialsFile,
132 ) -> Result<Self, google_cloud_auth::error::Error> {
133 if let Environment::GoogleCloud(_) = self.environment {
134 let ts = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
135 Self::auth_config(),
136 Box::new(credentials),
137 )
138 .await?;
139 self.environment = Environment::GoogleCloud(Box::new(ts))
140 }
141 Ok(self)
142 }
143
144 fn auth_config() -> google_cloud_auth::project::Config<'static> {
145 google_cloud_auth::project::Config::default()
146 .with_audience(crate::apiv1::conn_pool::AUDIENCE)
147 .with_scopes(&crate::apiv1::conn_pool::SCOPES)
148 }
149}
150
151#[derive(thiserror::Error, Debug)]
152pub enum Error {
153 #[error(transparent)]
154 GRPC(#[from] Status),
155
156 #[error(transparent)]
157 InvalidSession(#[from] SessionError),
158
159 #[error(transparent)]
160 ParseError(#[from] crate::row::Error),
161
162 #[error(transparent)]
163 Connection(#[from] google_cloud_gax::conn::Error),
164
165 #[error("invalid config: {0}")]
166 InvalidConfig(String),
167}
168
169impl TryAs<Status> for Error {
170 fn try_as(&self) -> Option<&Status> {
171 match self {
172 Error::GRPC(e) => Some(e),
173 _ => None,
174 }
175 }
176}
177
178/// Client is a client for reading and writing data to a Cloud Spanner database.
179/// A client is safe to use concurrently, except for its Close method.
180#[derive(Clone)]
181pub struct Client {
182 sessions: Arc<SessionManager>,
183 disable_route_to_leader: bool,
184}
185
186impl Client {
187 /// new creates a client to a database. A valid database name has
188 /// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
189 pub async fn new(database: impl Into<String>, config: ClientConfig) -> Result<Self, Error> {
190 if config.session_config.max_opened > config.channel_config.num_channels * 100 {
191 return Err(Error::InvalidConfig(format!(
192 "max session size is {} because max session size is 100 per gRPC connection",
193 config.channel_config.num_channels * 100
194 )));
195 }
196
197 let database: String = database.into();
198 let metrics = Arc::new(
199 MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?,
200 );
201
202 let pool_size = config.channel_config.num_channels;
203 let options = ConnectionOptions {
204 timeout: Some(config.channel_config.timeout),
205 connect_timeout: Some(config.channel_config.connect_timeout),
206 http2_keep_alive_interval: config.channel_config.http2_keep_alive_interval,
207 keep_alive_timeout: config.channel_config.keep_alive_timeout,
208 keep_alive_while_idle: config.channel_config.keep_alive_while_idle,
209 };
210 let conn_pool =
211 ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
212 let session_manager = SessionManager::new(
213 database,
214 conn_pool,
215 config.session_config,
216 config.disable_route_to_leader,
217 metrics.clone(),
218 )
219 .await?;
220
221 Ok(Client {
222 sessions: session_manager,
223 disable_route_to_leader: config.disable_route_to_leader,
224 })
225 }
226
227 /// Close closes all the sessions gracefully.
228 /// This method can be called only once.
229 pub async fn close(self) {
230 self.sessions.close().await;
231 }
232
233 /// single provides a read-only snapshot transaction optimized for the case
234 /// where only a single read or query is needed. This is more efficient than
235 /// using read_only_transaction for a single read or query.
236 /// ```
237 /// use google_cloud_spanner::key::Key;
238 /// use google_cloud_spanner::statement::ToKind;
239 /// use google_cloud_spanner::client::Client;
240 ///
241 /// #[tokio::main]
242 /// async fn run(client: Client) {
243 /// let mut tx = client.single().await.unwrap();
244 /// let iter1 = tx.read("Guild",&["GuildID", "OwnerUserID"], vec![
245 /// Key::new(&"pk1"),
246 /// Key::new(&"pk2")
247 /// ]).await.unwrap();
248 /// }
249 /// ```
250 pub async fn single(&self) -> Result<ReadOnlyTransaction, Error> {
251 self.single_with_timestamp_bound(TimestampBound::strong_read()).await
252 }
253
254 /// single provides a read-only snapshot transaction optimized for the case
255 /// where only a single read or query is needed. This is more efficient than
256 /// using read_only_transaction for a single read or query.
257 pub async fn single_with_timestamp_bound(&self, tb: TimestampBound) -> Result<ReadOnlyTransaction, Error> {
258 let session = self.get_session().await?;
259 let result = ReadOnlyTransaction::single(session, tb).await?;
260 Ok(result)
261 }
262
263 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
264 /// multiple reads from the database.
265 ///
266 /// ```ignore
267 /// use google_cloud_spanner::client::{Client, Error};
268 /// use google_cloud_spanner::statement::Statement;
269 /// use google_cloud_spanner::key::Key;
270 ///
271 /// async fn run(client: Client) -> Result<(), Error>{
272 /// let mut tx = client.read_only_transaction().await?;
273 ///
274 /// let mut stmt = Statement::new("SELECT * , \
275 /// ARRAY (SELECT AS STRUCT * FROM UserItem WHERE UserId = @Param1 ) AS UserItem, \
276 /// ARRAY (SELECT AS STRUCT * FROM UserCharacter WHERE UserId = @Param1 ) AS UserCharacter \
277 /// FROM User \
278 /// WHERE UserId = @Param1");
279 ///
280 /// stmt.add_param("Param1", user_id);
281 /// let mut reader = tx.query(stmt).await?;
282 /// let mut data = vec![];
283 /// while let Some(row) = reader.next().await? {
284 /// let user_id= row.column_by_name::<String>("UserId")?;
285 /// let user_items= row.column_by_name::<Vec<model::UserItem>>("UserItem")?;
286 /// let user_characters = row.column_by_name::<Vec<model::UserCharacter>>("UserCharacter")?;
287 /// data.push(user_id);
288 /// }
289 ///
290 /// let mut reader2 = tx.read("User", &["UserId"], vec![
291 /// Key::new(&"user-1"),
292 /// Key::new(&"user-2")
293 /// ]).await?;
294 ///
295 /// Ok(())
296 /// }
297 pub async fn read_only_transaction(&self) -> Result<ReadOnlyTransaction, Error> {
298 self.read_only_transaction_with_option(ReadOnlyTransactionOption::default())
299 .await
300 }
301
302 /// read_only_transaction returns a ReadOnlyTransaction that can be used for
303 /// multiple reads from the database.
304 pub async fn read_only_transaction_with_option(
305 &self,
306 options: ReadOnlyTransactionOption,
307 ) -> Result<ReadOnlyTransaction, Error> {
308 let session = self.get_session().await?;
309 let result = ReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
310 Ok(result)
311 }
312
313 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
314 /// for partitioned reads or queries from a snapshot of the database. This is
315 /// useful in batch processing pipelines where one wants to divide the work of
316 /// reading from the database across multiple machines.
317 pub async fn batch_read_only_transaction(&self) -> Result<BatchReadOnlyTransaction, Error> {
318 self.batch_read_only_transaction_with_option(ReadOnlyTransactionOption::default())
319 .await
320 }
321
322 /// batch_read_only_transaction returns a BatchReadOnlyTransaction that can be used
323 /// for partitioned reads or queries from a snapshot of the database. This is
324 /// useful in batch processing pipelines where one wants to divide the work of
325 /// reading from the database across multiple machines.
326 pub async fn batch_read_only_transaction_with_option(
327 &self,
328 options: ReadOnlyTransactionOption,
329 ) -> Result<BatchReadOnlyTransaction, Error> {
330 let session = self.get_session().await?;
331 let result = BatchReadOnlyTransaction::begin(session, options.timestamp_bound, options.call_options).await?;
332 Ok(result)
333 }
334
335 /// partitioned_update executes a DML statement in parallel across the database,
336 /// using separate, internal transactions that commit independently. The DML
337 /// statement must be fully partitionable: it must be expressible as the union
338 /// of many statements each of which accesses only a single row of the table. The
339 /// statement should also be idempotent, because it may be applied more than once.
340 ///
341 /// PartitionedUpdate returns an estimated count of the number of rows affected.
342 /// The actual number of affected rows may be greater than the estimate.
343 pub async fn partitioned_update(&self, stmt: Statement) -> Result<i64, Error> {
344 self.partitioned_update_with_option(stmt, PartitionedUpdateOption::default())
345 .await
346 }
347
348 /// partitioned_update executes a DML statement in parallel across the database,
349 /// using separate, internal transactions that commit independently. The DML
350 /// statement must be fully partitionable: it must be expressible as the union
351 /// of many statements each of which accesses only a single row of the table. The
352 /// statement should also be idempotent, because it may be applied more than once.
353 ///
354 /// PartitionedUpdate returns an estimated count of the number of rows affected.
355 /// The actual number of affected rows may be greater than the estimate.
356 pub async fn partitioned_update_with_option(
357 &self,
358 stmt: Statement,
359 options: PartitionedUpdateOption,
360 ) -> Result<i64, Error> {
361 let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
362 let session = Some(self.get_session().await?);
363
364 // reuse session
365 invoke_fn(
366 Some(ro),
367 |session| async {
368 let mut tx = match ReadWriteTransaction::begin_partitioned_dml(
369 session.unwrap(),
370 options.begin_options.clone(),
371 options.transaction_tag.clone(),
372 self.disable_route_to_leader,
373 )
374 .await
375 {
376 Ok(tx) => tx,
377 Err(e) => return Err((Error::GRPC(e.status), Some(e.session))),
378 };
379 tx.update_with_option(stmt.clone(), options.query_options.clone().unwrap_or_default())
380 .await
381 .map_err(|e| (Error::GRPC(e), tx.take_session()))
382 },
383 session,
384 )
385 .await
386 }
387
388 /// apply_at_least_once may attempt to apply mutations more than once; if
389 /// the mutations are not idempotent, this may lead to a failure being reported
390 /// when the mutation was applied more than once. For example, an insert may
391 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
392 /// called. For this reason, most users of the library will prefer not to use
393 /// this option. However, apply_at_least_once requires only a single RPC, whereas
394 /// apply's default replay protection may require an additional RPC. So this
395 /// method may be appropriate for latency sensitive and/or high throughput blind
396 /// writing.
397 pub async fn apply_at_least_once(&self, ms: Vec<Mutation>) -> Result<Option<CommitResult>, Error> {
398 self.apply_at_least_once_with_option(ms, CommitOptions::default()).await
399 }
400
401 /// apply_at_least_once may attempt to apply mutations more than once; if
402 /// the mutations are not idempotent, this may lead to a failure being reported
403 /// when the mutation was applied more than once. For example, an insert may
404 /// fail with ALREADY_EXISTS even though the row did not exist before Apply was
405 /// called. For this reason, most users of the library will prefer not to use
406 /// this option. However, apply_at_least_once requires only a single RPC, whereas
407 /// apply's default replay protection may require an additional RPC. So this
408 /// method may be appropriate for latency sensitive and/or high throughput blind
409 /// writing.
410 pub async fn apply_at_least_once_with_option(
411 &self,
412 ms: Vec<Mutation>,
413 options: CommitOptions,
414 ) -> Result<Option<CommitResult>, Error> {
415 let ro = TransactionRetrySetting::default();
416 let mut session = self.get_session().await?;
417
418 invoke_fn(
419 Some(ro),
420 |session| async {
421 let tx = commit_request::Transaction::SingleUseTransaction(TransactionOptions {
422 exclude_txn_from_change_streams: false,
423 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
424 isolation_level: IsolationLevel::Unspecified as i32,
425 });
426 match commit(session, ms.clone(), tx, options.clone(), self.disable_route_to_leader).await {
427 Ok(s) => Ok(Some(s.into())),
428 Err(e) => Err((Error::GRPC(e), session)),
429 }
430 },
431 &mut session,
432 )
433 .await
434 }
435
436 /// Apply applies a list of mutations atomically to the database.
437 /// ```
438 /// use google_cloud_spanner::mutation::insert;
439 /// use google_cloud_spanner::mutation::delete;
440 /// use google_cloud_spanner::key::all_keys;
441 /// use google_cloud_spanner::statement::ToKind;
442 /// use google_cloud_spanner::client::{Client, Error};
443 /// use google_cloud_spanner::value::CommitTimestamp;
444 ///
445 /// async fn run(client: Client) -> Result<(), Error>{
446 /// let m1 = delete("Guild", all_keys());
447 /// let m2 = insert("Guild", &["GuildID", "OwnerUserID", "UpdatedAt"], &[&"1", &"2", &CommitTimestamp::new()]);
448 /// let commit_timestamp = client.apply(vec![m1,m2]).await?;
449 /// Ok(())
450 /// }
451 /// ```
452 pub async fn apply(&self, ms: Vec<Mutation>) -> Result<CommitResult, Error> {
453 self.apply_with_option(ms, ReadWriteTransactionOption::default()).await
454 }
455
456 /// Apply applies a list of mutations atomically to the database.
457 pub async fn apply_with_option(
458 &self,
459 ms: Vec<Mutation>,
460 options: ReadWriteTransactionOption,
461 ) -> Result<CommitResult, Error> {
462 let result: Result<(CommitResult, ()), Error> = self
463 .read_write_transaction_sync_with_option(
464 |tx| {
465 tx.buffer_write(ms.to_vec());
466 Ok(())
467 },
468 options,
469 )
470 .await;
471 Ok(result?.0)
472 }
473
474 /// ReadWriteTransaction executes a read-write transaction, with retries as
475 /// necessary.
476 ///
477 /// The function f will be called one or more times. It must not maintain
478 /// any state between calls.
479 ///
480 /// If the transaction cannot be committed or if f returns an ABORTED error,
481 /// ReadWriteTransaction will call f again. It will continue to call f until the
482 /// transaction can be committed or the Context times out or is cancelled. If f
483 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
484 /// transaction and return the error.
485 ///
486 /// To limit the number of retries, set a deadline on the Context rather than
487 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
488 /// retry as needed until that deadline is met.
489 ///
490 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
491 /// more details.
492 /// ```
493 /// use google_cloud_spanner::mutation::update;
494 /// use google_cloud_spanner::key::{Key, all_keys};
495 /// use google_cloud_spanner::value::Timestamp;
496 /// use google_cloud_spanner::client::Error;
497 /// use google_cloud_spanner::client::Client;
498 ///
499 /// #[tokio::main]
500 /// async fn run(client: Client) -> Result<(Option<Timestamp>,()), Error>{
501 /// client.read_write_transaction(|tx| {
502 /// Box::pin(async move {
503 /// // The transaction function will be called again if the error code
504 /// // of this error is Aborted. The backend may automatically abort
505 /// // any read/write transaction if it detects a deadlock or other problems.
506 /// let key = all_keys();
507 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
508 /// let mut ms = vec![];
509 /// while let Some(row) = reader.next().await? {
510 /// let user_id = row.column_by_name::<String>("UserId")?;
511 /// let item_id = row.column_by_name::<i64>("ItemId")?;
512 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
513 /// let m = update("UserItem", &["Quantity"], &[&user_id, &item_id, &quantity]);
514 /// ms.push(m);
515 /// }
516 /// // The buffered mutation will be committed. If the commit
517 /// // fails with an Aborted error, this function will be called again
518 /// tx.buffer_write(ms);
519 /// Ok(())
520 /// })
521 /// }).await
522 /// }
523 pub async fn read_write_transaction<'a, T, E, F>(&self, f: F) -> Result<(CommitResult, T), E>
524 where
525 E: TryAs<Status> + From<SessionError> + From<Status>,
526 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
527 {
528 self.read_write_transaction_with_option(f, ReadWriteTransactionOption::default())
529 .await
530 }
531
532 /// ReadWriteTransaction executes a read-write transaction, with retries as
533 /// necessary.
534 ///
535 /// The function f will be called one or more times. It must not maintain
536 /// any state between calls.
537 ///
538 /// If the transaction cannot be committed or if f returns an ABORTED error,
539 /// ReadWriteTransaction will call f again. It will continue to call f until the
540 /// transaction can be committed or the Context times out or is cancelled. If f
541 /// returns an error other than ABORTED, ReadWriteTransaction will abort the
542 /// transaction and return the error.
543 ///
544 /// To limit the number of retries, set a deadline on the Context rather than
545 /// using a fixed limit on the number of attempts. ReadWriteTransaction will
546 /// retry as needed until that deadline is met.
547 ///
548 /// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
549 /// more details.
550 pub async fn read_write_transaction_with_option<'a, T, E, F>(
551 &'a self,
552 f: F,
553 options: ReadWriteTransactionOption,
554 ) -> Result<(CommitResult, T), E>
555 where
556 E: TryAs<Status> + From<SessionError> + From<Status>,
557 F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
558 {
559 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
560
561 let ro = TransactionRetrySetting::default();
562 let session = Some(self.get_session().await?);
563 // must reuse session
564 invoke_fn(
565 Some(ro),
566 |session| async {
567 let mut tx = self
568 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
569 .await?;
570 let result = f(&mut tx).await;
571 tx.finish(result, Some(co.clone())).await
572 },
573 session,
574 )
575 .await
576 }
577
578 /// begin_read_write_transaction creates new ReadWriteTransaction.
579 /// ```
580 /// use google_cloud_spanner::mutation::update;
581 /// use google_cloud_spanner::key::{Key, all_keys};
582 /// use google_cloud_spanner::value::Timestamp;
583 /// use google_cloud_spanner::client::Error;
584 /// use google_cloud_spanner::client::Client;
585 /// use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
586 /// use google_cloud_googleapis::spanner::v1::execute_batch_dml_request::Statement;
587 /// use google_cloud_spanner::retry::TransactionRetry;
588 ///
589 /// async fn run(client: Client) -> Result<(), Error>{
590 /// let retry = &mut TransactionRetry::new();
591 /// loop {
592 /// let tx = &mut client.begin_read_write_transaction().await?;
593 ///
594 /// let result = run_in_transaction(tx).await;
595 ///
596 /// // try to commit or rollback transaction.
597 /// match tx.end(result, None).await {
598 /// Ok((_commit_timestamp, success)) => return Ok(success),
599 /// Err(err) => retry.next(err).await? // check retry
600 /// }
601 /// }
602 /// }
603 ///
604 /// async fn run_in_transaction(tx: &mut ReadWriteTransaction) -> Result<(), Error> {
605 /// let key = all_keys();
606 /// let mut reader = tx.read("UserItem", &["UserId", "ItemId", "Quantity"], key).await?;
607 /// let mut ms = vec![];
608 /// while let Some(row) = reader.next().await? {
609 /// let user_id = row.column_by_name::<String>("UserId")?;
610 /// let item_id = row.column_by_name::<i64>("ItemId")?;
611 /// let quantity = row.column_by_name::<i64>("Quantity")? + 1;
612 /// let m = update("UserItem", &["UserId", "ItemId", "Quantity"], &[&user_id, &item_id, &quantity]);
613 /// ms.push(m);
614 /// }
615 /// tx.buffer_write(ms);
616 /// Ok(())
617 /// }
618 /// ```
619 pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, Error> {
620 let session = self.get_session().await?;
621 ReadWriteTransaction::begin(
622 session,
623 ReadWriteTransactionOption::default().begin_options,
624 ReadWriteTransactionOption::default().transaction_tag,
625 self.disable_route_to_leader,
626 )
627 .await
628 .map_err(|e| e.status.into())
629 }
630
631 /// Get open session count.
632 pub fn session_count(&self) -> usize {
633 self.sessions.num_opened()
634 }
635
636 async fn read_write_transaction_sync_with_option<T, E>(
637 &self,
638 f: impl Fn(&mut ReadWriteTransaction) -> Result<T, E>,
639 options: ReadWriteTransactionOption,
640 ) -> Result<(CommitResult, T), E>
641 where
642 E: TryAs<Status> + From<SessionError> + From<Status>,
643 {
644 let (bo, co, tag) = Client::split_read_write_transaction_option(options);
645
646 let ro = TransactionRetrySetting::default();
647 let session = Some(self.get_session().await?);
648
649 // reuse session
650 invoke_fn(
651 Some(ro),
652 |session| async {
653 let mut tx = self
654 .create_read_write_transaction::<E>(session, bo.clone(), tag.clone())
655 .await?;
656 let result = f(&mut tx);
657 tx.finish(result, Some(co.clone())).await
658 },
659 session,
660 )
661 .await
662 }
663
664 async fn create_read_write_transaction<E>(
665 &self,
666 session: Option<ManagedSession>,
667 bo: CallOptions,
668 transaction_tag: Option<String>,
669 ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
670 where
671 E: TryAs<Status> + From<SessionError> + From<Status>,
672 {
673 ReadWriteTransaction::begin(session.unwrap(), bo, transaction_tag, self.disable_route_to_leader)
674 .await
675 .map_err(|e| (E::from(e.status), Some(e.session)))
676 }
677
678 async fn get_session(&self) -> Result<ManagedSession, SessionError> {
679 self.sessions.get().await
680 }
681
682 fn split_read_write_transaction_option(
683 options: ReadWriteTransactionOption,
684 ) -> (CallOptions, CommitOptions, Option<String>) {
685 (options.begin_options, options.commit_options, options.transaction_tag)
686 }
687}