gcloud_spanner/
transaction_ro.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use crate::key::KeySet;
8use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
9use crate::session::ManagedSession;
10use crate::statement::Statement;
11use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
12use crate::value::TimestampBound;
13use google_cloud_gax::grpc::Status;
14use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
15use google_cloud_googleapis::spanner::v1::{
16    transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
17    PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
18    TransactionSelector,
19};
20
21/// ReadOnlyTransaction provides a snapshot transaction with guaranteed
22/// consistency across reads, but does not allow writes.  Read-only transactions
23/// can be configured to read at timestamps in the past.
24///
25/// Read-only transactions do not take locks. Instead, they work by choosing a
26/// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
27/// they do not acquire locks, they do not block concurrent read-write
28/// transactions.
29///
30/// Unlike locking read-write transactions, read-only transactions never abort.
31/// They can fail if the chosen read timestamp is garbage collected; however, the
32/// default garbage collection policy is generous enough that most applications
33/// do not need to worry about this in practice. See the documentation of
34/// TimestampBound for more details.
35pub struct ReadOnlyTransaction {
36    base_tx: Transaction,
37    pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41    type Target = Transaction;
42
43    fn deref(&self) -> &Self::Target {
44        &self.base_tx
45    }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49    fn deref_mut(&mut self) -> &mut Self::Target {
50        &mut self.base_tx
51    }
52}
53
54impl ReadOnlyTransaction {
55    pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56        Ok(ReadOnlyTransaction {
57            base_tx: Transaction {
58                session: Some(session),
59                sequence_number: AtomicI64::new(0),
60                transaction_selector: TransactionSelector {
61                    selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62                        exclude_txn_from_change_streams: false,
63                        mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64                        isolation_level: IsolationLevel::Unspecified as i32,
65                    })),
66                },
67                transaction_tag: None,
68            },
69            rts: None,
70        })
71    }
72
73    /// begin starts a snapshot read-only Transaction on Cloud Spanner.
74    pub async fn begin(
75        mut session: ManagedSession,
76        tb: TimestampBound,
77        options: CallOptions,
78    ) -> Result<ReadOnlyTransaction, Status> {
79        let request = BeginTransactionRequest {
80            session: session.session.name.to_string(),
81            options: Some(TransactionOptions {
82                exclude_txn_from_change_streams: false,
83                mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
84                isolation_level: IsolationLevel::Unspecified as i32,
85            }),
86            request_options: Transaction::create_request_options(options.priority, None),
87            mutation_key: None,
88        };
89
90        let result = session.spanner_client.begin_transaction(request, options.retry).await;
91        match session.invalidate_if_needed(result).await {
92            Ok(response) => {
93                let tx = response.into_inner();
94                let rts = tx.read_timestamp.unwrap();
95                let st: SystemTime = rts.try_into().unwrap();
96                Ok(ReadOnlyTransaction {
97                    base_tx: Transaction {
98                        session: Some(session),
99                        sequence_number: AtomicI64::new(0),
100                        transaction_selector: TransactionSelector {
101                            selector: Some(transaction_selector::Selector::Id(tx.id)),
102                        },
103                        transaction_tag: None,
104                    },
105                    rts: Some(OffsetDateTime::from(st)),
106                })
107            }
108            Err(e) => Err(e),
109        }
110    }
111}
112
113pub struct Partition<T: Reader> {
114    pub reader: T,
115}
116
117/// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
118/// arbitrarily large amounts of data from Cloud Spanner databases.
119/// BatchReadOnlyTransaction partitions a read/query request. Read/query request
120/// can then be executed independently over each partition while observing the
121/// same snapshot of the database.
122pub struct BatchReadOnlyTransaction {
123    base_tx: ReadOnlyTransaction,
124}
125
126impl Deref for BatchReadOnlyTransaction {
127    type Target = ReadOnlyTransaction;
128
129    fn deref(&self) -> &Self::Target {
130        &self.base_tx
131    }
132}
133
134impl DerefMut for BatchReadOnlyTransaction {
135    fn deref_mut(&mut self) -> &mut Self::Target {
136        &mut self.base_tx
137    }
138}
139
140impl BatchReadOnlyTransaction {
141    pub async fn begin(
142        session: ManagedSession,
143        tb: TimestampBound,
144        options: CallOptions,
145    ) -> Result<BatchReadOnlyTransaction, Status> {
146        let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
147        Ok(BatchReadOnlyTransaction { base_tx: tx })
148    }
149
150    /// partition_read returns a list of Partitions that can be used to read rows from
151    /// the database. These partitions can be executed across multiple processes,
152    /// even across different machines. The partition size and count hints can be
153    /// configured using PartitionOptions.
154    pub async fn partition_read(
155        &mut self,
156        table: &str,
157        columns: &[&str],
158        keys: impl Into<KeySet> + Clone,
159    ) -> Result<Vec<Partition<TableReader>>, Status> {
160        self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
161            .await
162    }
163
164    /// partition_read returns a list of Partitions that can be used to read rows from
165    /// the database. These partitions can be executed across multiple processes,
166    /// even across different machines. The partition size and count hints can be
167    /// configured using PartitionOptions.
168    #[allow(clippy::too_many_arguments)]
169    pub async fn partition_read_with_option(
170        &mut self,
171        table: &str,
172        columns: &[&str],
173        keys: impl Into<KeySet> + Clone,
174        po: Option<PartitionOptions>,
175        ro: ReadOptions,
176        data_boost_enabled: bool,
177        directed_read_options: Option<DirectedReadOptions>,
178    ) -> Result<Vec<Partition<TableReader>>, Status> {
179        let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
180        let inner_keyset = keys.into().inner;
181        let request = PartitionReadRequest {
182            session: self.get_session_name(),
183            transaction: Some(self.transaction_selector.clone()),
184            table: table.to_string(),
185            index: ro.index.clone(),
186            columns: columns.clone(),
187            key_set: Some(inner_keyset.clone()),
188            partition_options: po,
189        };
190        let result = match self
191            .as_mut_session()
192            .spanner_client
193            .partition_read(request, ro.call_options.retry)
194            .await
195        {
196            Ok(r) => Ok(r
197                .into_inner()
198                .partitions
199                .into_iter()
200                .map(|x| Partition {
201                    reader: TableReader {
202                        request: ReadRequest {
203                            session: self.get_session_name(),
204                            transaction: Some(self.transaction_selector.clone()),
205                            table: table.to_string(),
206                            index: ro.index.clone(),
207                            columns: columns.clone(),
208                            key_set: Some(inner_keyset.clone()),
209                            limit: ro.limit,
210                            resume_token: vec![],
211                            partition_token: x.partition_token,
212                            request_options: Transaction::create_request_options(
213                                ro.call_options.priority,
214                                self.base_tx.transaction_tag.clone(),
215                            ),
216                            directed_read_options: directed_read_options.clone(),
217                            data_boost_enabled,
218                            order_by: 0,
219                            lock_hint: 0,
220                        },
221                    },
222                })
223                .collect()),
224            Err(e) => Err(e),
225        };
226        self.as_mut_session().invalidate_if_needed(result).await
227    }
228
229    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
230    pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
231        self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
232            .await
233    }
234
235    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
236    pub async fn partition_query_with_option(
237        &mut self,
238        stmt: Statement,
239        po: Option<PartitionOptions>,
240        qo: QueryOptions,
241        data_boost_enabled: bool,
242        directed_read_options: Option<DirectedReadOptions>,
243    ) -> Result<Vec<Partition<StatementReader>>, Status> {
244        let request = PartitionQueryRequest {
245            session: self.get_session_name(),
246            transaction: Some(self.transaction_selector.clone()),
247            sql: stmt.sql.clone(),
248            params: Some(prost_types::Struct {
249                fields: stmt.params.clone(),
250            }),
251            param_types: stmt.param_types.clone(),
252            partition_options: po,
253        };
254        let result = match self
255            .as_mut_session()
256            .spanner_client
257            .partition_query(request.clone(), qo.call_options.retry.clone())
258            .await
259        {
260            Ok(r) => Ok(r
261                .into_inner()
262                .partitions
263                .into_iter()
264                .map(|x| Partition {
265                    reader: StatementReader {
266                        enable_resume: qo.enable_resume,
267                        request: ExecuteSqlRequest {
268                            session: self.get_session_name(),
269                            transaction: Some(self.transaction_selector.clone()),
270                            sql: stmt.sql.clone(),
271                            params: Some(prost_types::Struct {
272                                fields: stmt.params.clone(),
273                            }),
274                            param_types: stmt.param_types.clone(),
275                            resume_token: vec![],
276                            query_mode: 0,
277                            partition_token: x.partition_token,
278                            seqno: 0,
279                            query_options: qo.optimizer_options.clone(),
280                            request_options: Transaction::create_request_options(
281                                qo.call_options.priority,
282                                self.base_tx.transaction_tag.clone(),
283                            ),
284                            data_boost_enabled,
285                            directed_read_options: directed_read_options.clone(),
286                            last_statement: false,
287                        },
288                    },
289                })
290                .collect()),
291            Err(e) => Err(e),
292        };
293        self.as_mut_session().invalidate_if_needed(result).await
294    }
295
296    /// execute runs a single Partition obtained from partition_read or partition_query.
297    pub async fn execute<T: Reader + Sync + Send + 'static>(
298        &mut self,
299        partition: Partition<T>,
300        option: Option<CallOptions>,
301    ) -> Result<RowIterator<'_, T>, Status> {
302        let session = self.as_mut_session();
303        RowIterator::new(session, partition.reader, option).await
304    }
305}