Skip to main content

TransactionRunnerBuilder

Struct TransactionRunnerBuilder 

Source
pub struct TransactionRunnerBuilder { /* private fields */ }
Expand description

A builder for a TransactionRunner for a read/write transaction.

§Example

let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction().build().await?;

let result = runner.run(async |transaction| {
    let statement = Statement::builder("UPDATE MyTable SET MyColumn = 'MyValue' WHERE Id = 1").build();
    transaction.execute_update(statement).await?;
    Ok(42)
}).await?;

Spanner can abort any read/write transaction at any time. A TransactionRunner automatically retries aborted transactions according to the configured retry policy.

Implementations§

Source§

impl TransactionRunnerBuilder

Source

pub fn with_transaction_timeout(self, timeout: StdDuration) -> Self

Sets the timeout for the entire transaction.

§Example
let runner = db_client.read_write_transaction()
    .with_transaction_timeout(Duration::from_secs(5))
    .build()
    .await?;

This timeout applies to the total time spent executing the transaction, including all statements and automatic retries. Each individual RPC within the transaction is automatically assigned a deadline derived from the remaining time of this overall timeout.

Source

pub fn with_begin_attempt_timeout(self, timeout: StdDuration) -> Self

Sets the per-attempt timeout for the BeginTransaction RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_begin_attempt_timeout(Duration::from_secs(5))
    .build()
    .await?;

Note: This timeout is only used if the transaction uses the ExplicitBegin transaction option.

Source

pub fn with_begin_retry_policy(self, policy: impl Into<RetryPolicyArg>) -> Self

Sets the retry policy for the BeginTransaction RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_begin_retry_policy(NeverRetry)
    .build()
    .await?;

Note: This policy is only used if the transaction uses the ExplicitBegin transaction option.

Source

pub fn with_begin_backoff_policy( self, policy: impl Into<BackoffPolicyArg>, ) -> Self

Sets the backoff policy for the BeginTransaction RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_begin_backoff_policy(ExponentialBackoff::default())
    .build()
    .await?;

Note: This policy is only used if the transaction uses the ExplicitBegin transaction option.

Source

pub fn with_commit_attempt_timeout(self, timeout: StdDuration) -> Self

Sets the per-attempt timeout for the Commit RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_commit_attempt_timeout(Duration::from_secs(5))
    .build()
    .await?;
Source

pub fn with_commit_retry_policy(self, policy: impl Into<RetryPolicyArg>) -> Self

Sets the retry policy for the Commit RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_commit_retry_policy(NeverRetry)
    .build()
    .await?;
Source

pub fn with_commit_backoff_policy( self, policy: impl Into<BackoffPolicyArg>, ) -> Self

Sets the backoff policy for the Commit RPC.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .with_commit_backoff_policy(ExponentialBackoff::default())
    .build()
    .await?;
Source

pub fn set_isolation_level(self, isolation_level: IsolationLevel) -> Self

Sets the isolation level for the transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client
    .read_write_transaction()
    .set_isolation_level(IsolationLevel::Serializable)
    .build()
    .await?;

See also: https://docs.cloud.google.com/spanner/docs/isolation-levels

Source

pub fn set_read_lock_mode(self, read_lock_mode: ReadLockMode) -> Self

Sets the read lock mode for the transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client
    .read_write_transaction()
    .set_read_lock_mode(ReadLockMode::Pessimistic)
    .build()
    .await?;

See also: https://docs.cloud.google.com/spanner/docs/concurrency-control

Source

pub fn set_transaction_tag(self, tag: impl Into<String>) -> Self

Sets the transaction tag for the transaction.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .set_transaction_tag("my-tag")
    .build()
    .await?;

The tag is applied to all statements executed within the transaction.

See also: Troubleshooting with tags

Source

pub fn with_begin_transaction_option( self, option: BeginTransactionOption, ) -> Self

Sets the option for how to start a transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client
    .read_write_transaction()
    .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
    .build()
    .await?;

By default, the Spanner client will inline the BeginTransaction call with the first query or DML statement in the transaction. This reduces the number of round-trips to Spanner that are needed for a transaction. Setting this option to ExplicitBegin can be beneficial for specific transaction shapes:

  1. When the transaction executes multiple parallel queries at the start of the transaction. Only one query can include a BeginTransaction option, and all other queries must wait for the first query to return the first result before they can proceed to execute. A BeginTransaction RPC will quickly return a transaction ID and allow all queries to start execution in parallel once the transaction ID has been returned.
  2. When the first statement in the transaction could fail. If the statement fails, then it will also not start a transaction and return a transaction ID. The transaction will then fall back to executing a BeginTransaction RPC and retry the first statement.

Default is BeginTransactionOption::InlineBegin.

Source

pub fn set_commit_priority(self, priority: Priority) -> Self

Sets the RPC priority to use for the commit of this transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client
    .read_write_transaction()
    .set_commit_priority(Priority::Low)
    .build()
    .await?;
Source

pub fn set_max_commit_delay(self, delay: Duration) -> Self

Sets the maximum commit delay for the transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client
    .read_write_transaction()
    .set_max_commit_delay(Duration::try_from("0.2s").unwrap())
    .build()
    .await?;

This option allows you to specify the maximum amount of time Spanner can adjust the commit timestamp of the transaction to allow for commit batching. Increasing this value can increase throughput at the expense of latency. The value must be between 0 and 500 milliseconds. If not set, or set to 0, Spanner does not delay the commit.

Source

pub fn set_exclude_txn_from_change_streams(self, exclude: bool) -> Self

Sets whether to exclude the transaction from change streams.

§Example
let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction()
    .set_exclude_txn_from_change_streams(true)
    .build()
    .await?;

When set to true, it prevents modifications from this transaction from being tracked in change streams. Note that this only affects change streams that have been created with the DDL option allow_txn_exclusion = true. If allow_txn_exclusion is not set or set to false for a change stream, updates made within this transaction are recorded in that change stream regardless of this setting.

When set to false or not specified, modifications from this transaction are recorded in all change streams tracking columns modified by this transaction.

Source

pub fn set_return_commit_stats(self, return_stats: bool) -> Self

Sets whether to return commit stats for the transaction.

§Example
let runner = db_client.read_write_transaction()
    .set_return_commit_stats(true)
    .build()
    .await?;

let result = runner.run(async |transaction| {
    let statement = Statement::builder("UPDATE MyTable SET MyColumn = 'MyValue' WHERE Id = 1").build();
    transaction.execute_update(statement).await?;
    Ok(42)
}).await?;

if let Some(stats) = result.commit_response.commit_stats {
    println!("Mutation count: {}", stats.mutation_count);
}

See also: https://docs.cloud.google.com/spanner/docs/commit-statistics

Source

pub fn with_retry_policy<P: TransactionRetryPolicy + 'static>( self, policy: P, ) -> Self

Sets the retry policy for the transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;

let retry_policy = BasicTransactionRetryPolicy::new()
    .with_max_attempts(5)
    .with_total_timeout(Duration::from_secs(60));

let runner = db_client
    .read_write_transaction()
    .with_retry_policy(retry_policy)
    .build()
    .await?;
Source

pub async fn build(self) -> Result<TransactionRunner>

Builds a TransactionRunner for a read/write transaction.

§Example
let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
let runner = db_client.read_write_transaction().build().await?;

let result = runner.run(async |transaction| {
    let statement = Statement::builder("UPDATE MyTable SET MyColumn = 'MyValue' WHERE Id = 1").build();
    transaction.execute_update(statement).await?;
    Ok(42)
}).await?;

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more