pub struct ReadWriteTransaction { /* private fields */ }
Expand description

ReadWriteTransaction provides a locking read-write transaction.

This type of transaction is the only way to write data into Cloud Spanner; Client::apply, Client::apply_at_least_once, Client::partitioned_update use transactions internally. These transactions rely on pessimistic locking and, if necessary, two-phase commit. Locking read-write transactions may abort, requiring the application to retry. However, the interface exposed by Client:run_with_retry eliminates the need for applications to write retry loops explicitly.

Locking transactions may be used to atomically read-modify-write data anywhere in a database. This type of transaction is externally consistent.

Clients should attempt to minimize the amount of time a transaction is active. Faster transactions commit with higher probability and cause less contention. Cloud Spanner attempts to keep read locks active as long as the transaction continues to do reads. Long periods of inactivity at the client may cause Cloud Spanner to release a transaction’s locks and abort it.

Reads performed within a transaction acquire locks on the data being read. Writes can only be done at commit time, after all reads have been completed. Conceptually, a read-write transaction consists of zero or more reads or SQL queries followed by a commit.

See Client::run_with_retry for an example.

Semantics

Cloud Spanner can commit the transaction if all read locks it acquired are still valid at commit time, and it is able to acquire write locks for all writes. Cloud Spanner can abort the transaction for any reason. If a commit attempt returns ABORTED, Cloud Spanner guarantees that the transaction has not modified any user data in Cloud Spanner.

Unless the transaction commits, Cloud Spanner makes no guarantees about how long the transaction’s locks were held for. It is an error to use Cloud Spanner locks for any sort of mutual exclusion other than between Cloud Spanner transactions themselves.

Aborted transactions

Application code does not need to retry explicitly; RunInTransaction will automatically retry a transaction if an attempt results in an abort. The lock priority of a transaction increases after each prior aborted transaction, meaning that the next attempt has a slightly better chance of success than before.

Under some circumstances (e.g., many transactions attempting to modify the same row(s)), a transaction can abort many times in a short period before successfully committing. Thus, it is not a good idea to cap the number of retries a transaction can attempt; instead, it is better to limit the total amount of wall time spent retrying.

Implementations§

Examples found in repository?
src/client.rs (line 617)
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
    pub async fn begin_read_write_transaction(&self) -> Result<ReadWriteTransaction, TxError> {
        let session = self.get_session().await?;
        ReadWriteTransaction::begin(session, ReadWriteTransactionOption::default().begin_options)
            .await
            .map_err(|e| e.status.into())
    }

    /// Get open session count.
    pub fn session_count(&self) -> usize {
        self.sessions.num_opened()
    }

    async fn read_write_transaction_sync_with_option<T, E>(
        &self,
        f: impl Fn(&mut ReadWriteTransaction, Option<CancellationToken>) -> Result<T, E>,
        options: ReadWriteTransactionOption,
    ) -> Result<(Option<Timestamp>, T), E>
    where
        E: TryAs<Status> + From<SessionError> + From<Status>,
    {
        let (bo, co) = Client::split_read_write_transaction_option(options);

        let ro = TransactionRetrySetting::default();
        let session = Some(self.get_session().await?);

        // reuse session
        let cancel = bo.cancel.clone();
        invoke_fn(
            cancel.clone(),
            Some(ro),
            |session| async {
                let cancel = cancel.clone().map(|v| v.child_token());
                let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
                let result = f(&mut tx, cancel);
                tx.finish(result, Some(co.clone())).await
            },
            session,
        )
        .await
    }

    async fn create_read_write_transaction<E>(
        &self,
        session: Option<ManagedSession>,
        bo: CallOptions,
    ) -> Result<ReadWriteTransaction, (E, Option<ManagedSession>)>
    where
        E: TryAs<Status> + From<SessionError> + From<Status>,
    {
        ReadWriteTransaction::begin(session.unwrap(), bo)
            .await
            .map_err(|e| (E::from(e.status), Some(e.session)))
    }
Examples found in repository?
src/client.rs (line 347)
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
    pub async fn partitioned_update_with_option(
        &self,
        stmt: Statement,
        options: PartitionedUpdateOption,
    ) -> Result<i64, TxError> {
        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
        let session = Some(self.get_session().await?);

        // reuse session
        invoke_fn(
            options.begin_options.cancel.clone(),
            Some(ro),
            |session| async {
                let mut tx =
                    match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
                        .await
                    {
                        Ok(tx) => tx,
                        Err(e) => return Err((TxError::GRPC(e.status), Some(e.session))),
                    };
                let qo = match options.query_options.clone() {
                    Some(o) => o,
                    None => QueryOptions::default(),
                };
                tx.update_with_option(stmt.clone(), qo)
                    .await
                    .map_err(|e| (TxError::GRPC(e), tx.take_session()))
            },
            session,
        )
        .await
    }
Examples found in repository?
src/client.rs (line 445)
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
    pub async fn apply_with_option(
        &self,
        ms: Vec<Mutation>,
        options: ReadWriteTransactionOption,
    ) -> Result<Option<Timestamp>, TxError> {
        let result: Result<(Option<Timestamp>, ()), TxError> = self
            .read_write_transaction_sync_with_option(
                |tx, _cancel| {
                    tx.buffer_write(ms.to_vec());
                    Ok(())
                },
                options,
            )
            .await;
        Ok(result?.0)
    }
Examples found in repository?
src/transaction_rw.rs (line 165)
164
165
166
    pub async fn update(&mut self, stmt: Statement) -> Result<i64, Status> {
        self.update_with_option(stmt, QueryOptions::default()).await
    }
More examples
Hide additional examples
src/client.rs (line 357)
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
    pub async fn partitioned_update_with_option(
        &self,
        stmt: Statement,
        options: PartitionedUpdateOption,
    ) -> Result<i64, TxError> {
        let ro = TransactionRetrySetting::new(vec![Code::Aborted, Code::Internal]);
        let session = Some(self.get_session().await?);

        // reuse session
        invoke_fn(
            options.begin_options.cancel.clone(),
            Some(ro),
            |session| async {
                let mut tx =
                    match ReadWriteTransaction::begin_partitioned_dml(session.unwrap(), options.begin_options.clone())
                        .await
                    {
                        Ok(tx) => tx,
                        Err(e) => return Err((TxError::GRPC(e.status), Some(e.session))),
                    };
                let qo = match options.query_options.clone() {
                    Some(o) => o,
                    None => QueryOptions::default(),
                };
                tx.update_with_option(stmt.clone(), qo)
                    .await
                    .map_err(|e| (TxError::GRPC(e), tx.take_session()))
            },
            session,
        )
        .await
    }
Examples found in repository?
src/transaction_rw.rs (line 193)
192
193
194
    pub async fn batch_update(&mut self, stmt: Vec<Statement>) -> Result<Vec<i64>, Status> {
        self.batch_update_with_option(stmt, QueryOptions::default()).await
    }

Methods from Deref<Target = Transaction>§

query executes a query against the database. It returns a RowIterator for retrieving the resulting rows.

query returns only row data, without a query plan or execution statistics.

query executes a query against the database. It returns a RowIterator for retrieving the resulting rows.

query returns only row data, without a query plan or execution statistics.

Examples found in repository?
src/transaction.rs (line 90)
89
90
91
    pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_>, Status> {
        self.query_with_option(statement, QueryOptions::default()).await
    }

read returns a RowIterator for reading multiple rows from the database.

use google_cloud_spanner::key::Key;
use google_cloud_spanner::client::Client;
use google_cloud_spanner::reader::AsyncIterator;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
    let client = Client::new(DATABASE).await?;

    let mut tx = client.single().await?;
    let mut iter = tx.read("Guild", &["GuildID", "OwnerUserID"], vec![
        Key::new(&"pk1"),
        Key::new(&"pk2")
    ]).await?;

    while let Some(row) = iter.next().await? {
        let guild_id = row.column_by_name::<String>("GuildID");
        //do something
    };
    Ok(())
}

read returns a RowIterator for reading multiple rows from the database.

Examples found in repository?
src/transaction.rs (line 152)
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
    pub async fn read(
        &mut self,
        table: &str,
        columns: &[&str],
        key_set: impl Into<KeySet>,
    ) -> Result<RowIterator<'_>, Status> {
        self.read_with_option(table, columns, key_set, ReadOptions::default())
            .await
    }

    /// read returns a RowIterator for reading multiple rows from the database.
    pub async fn read_with_option(
        &mut self,
        table: &str,
        columns: &[&str],
        key_set: impl Into<KeySet>,
        options: ReadOptions,
    ) -> Result<RowIterator<'_>, Status> {
        let request = ReadRequest {
            session: self.get_session_name(),
            transaction: Some(self.transaction_selector.clone()),
            table: table.to_string(),
            index: options.index,
            columns: columns.iter().map(|x| x.to_string()).collect(),
            key_set: Some(key_set.into().inner),
            limit: options.limit,
            resume_token: vec![],
            partition_token: vec![],
            request_options: Transaction::create_request_options(options.call_options.priority),
        };

        let session = self.as_mut_session();
        let reader = Box::new(TableReader { request });
        RowIterator::new(session, reader, Some(options.call_options)).await
    }

    /// read returns a RowIterator for reading multiple rows from the database.
    /// ```
    /// use google_cloud_spanner::key::Key;
    /// use google_cloud_spanner::client::Client;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), anyhow::Error> {
    ///     const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
    ///     let client = Client::new(DATABASE).await?;
    ///     let mut tx = client.single().await?;
    ///     let row = tx.read_row("Guild", &["GuildID", "OwnerUserID"], Key::new(&"guild1")).await?;
    ///     Ok(())
    /// }
    /// ```
    pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
        self.read_row_with_option(table, columns, key, ReadOptions::default())
            .await
    }

    /// read returns a RowIterator for reading multiple rows from the database.
    pub async fn read_row_with_option(
        &mut self,
        table: &str,
        columns: &[&str],
        key: Key,
        options: ReadOptions,
    ) -> Result<Option<Row>, Status> {
        let call_options = options.call_options.clone();
        let mut reader = self
            .read_with_option(table, columns, KeySet::from(key), options)
            .await?;
        reader.set_call_options(call_options);
        reader.next().await
    }

read returns a RowIterator for reading multiple rows from the database.

use google_cloud_spanner::key::Key;
use google_cloud_spanner::client::Client;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
    let client = Client::new(DATABASE).await?;
    let mut tx = client.single().await?;
    let row = tx.read_row("Guild", &["GuildID", "OwnerUserID"], Key::new(&"guild1")).await?;
    Ok(())
}

read returns a RowIterator for reading multiple rows from the database.

Examples found in repository?
src/transaction.rs (line 197)
196
197
198
199
    pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
        self.read_row_with_option(table, columns, key, ReadOptions::default())
            .await
    }

Trait Implementations§

The resulting type after dereferencing.
Dereferences the value.
Mutably dereferences the value.

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Wrap the input message T in a tonic::Request
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more