google_cloud_spanner/
transaction_ro.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use google_cloud_gax::grpc::Status;
8use google_cloud_googleapis::spanner::v1::{
9    transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
10    PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
11    TransactionSelector,
12};
13
14use crate::key::KeySet;
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::session::ManagedSession;
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
19use crate::value::TimestampBound;
20
21/// ReadOnlyTransaction provides a snapshot transaction with guaranteed
22/// consistency across reads, but does not allow writes.  Read-only transactions
23/// can be configured to read at timestamps in the past.
24///
25/// Read-only transactions do not take locks. Instead, they work by choosing a
26/// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
27/// they do not acquire locks, they do not block concurrent read-write
28/// transactions.
29///
30/// Unlike locking read-write transactions, read-only transactions never abort.
31/// They can fail if the chosen read timestamp is garbage collected; however, the
32/// default garbage collection policy is generous enough that most applications
33/// do not need to worry about this in practice. See the documentation of
34/// TimestampBound for more details.
35pub struct ReadOnlyTransaction {
36    base_tx: Transaction,
37    pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41    type Target = Transaction;
42
43    fn deref(&self) -> &Self::Target {
44        &self.base_tx
45    }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49    fn deref_mut(&mut self) -> &mut Self::Target {
50        &mut self.base_tx
51    }
52}
53
54impl ReadOnlyTransaction {
55    pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56        Ok(ReadOnlyTransaction {
57            base_tx: Transaction {
58                session: Some(session),
59                sequence_number: AtomicI64::new(0),
60                transaction_selector: TransactionSelector {
61                    selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62                        exclude_txn_from_change_streams: false,
63                        mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64                    })),
65                },
66            },
67            rts: None,
68        })
69    }
70
71    /// begin starts a snapshot read-only Transaction on Cloud Spanner.
72    pub async fn begin(
73        mut session: ManagedSession,
74        tb: TimestampBound,
75        options: CallOptions,
76    ) -> Result<ReadOnlyTransaction, Status> {
77        let request = BeginTransactionRequest {
78            session: session.session.name.to_string(),
79            options: Some(TransactionOptions {
80                exclude_txn_from_change_streams: false,
81                mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
82            }),
83            request_options: Transaction::create_request_options(options.priority),
84        };
85
86        let result = session.spanner_client.begin_transaction(request, options.retry).await;
87        match session.invalidate_if_needed(result).await {
88            Ok(response) => {
89                let tx = response.into_inner();
90                let rts = tx.read_timestamp.unwrap();
91                let st: SystemTime = rts.try_into().unwrap();
92                Ok(ReadOnlyTransaction {
93                    base_tx: Transaction {
94                        session: Some(session),
95                        sequence_number: AtomicI64::new(0),
96                        transaction_selector: TransactionSelector {
97                            selector: Some(transaction_selector::Selector::Id(tx.id)),
98                        },
99                    },
100                    rts: Some(OffsetDateTime::from(st)),
101                })
102            }
103            Err(e) => Err(e),
104        }
105    }
106}
107
108pub struct Partition<T: Reader> {
109    pub reader: T,
110}
111
112/// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
113/// arbitrarily large amounts of data from Cloud Spanner databases.
114/// BatchReadOnlyTransaction partitions a read/query request. Read/query request
115/// can then be executed independently over each partition while observing the
116/// same snapshot of the database.
117pub struct BatchReadOnlyTransaction {
118    base_tx: ReadOnlyTransaction,
119}
120
121impl Deref for BatchReadOnlyTransaction {
122    type Target = ReadOnlyTransaction;
123
124    fn deref(&self) -> &Self::Target {
125        &self.base_tx
126    }
127}
128
129impl DerefMut for BatchReadOnlyTransaction {
130    fn deref_mut(&mut self) -> &mut Self::Target {
131        &mut self.base_tx
132    }
133}
134
135impl BatchReadOnlyTransaction {
136    pub async fn begin(
137        session: ManagedSession,
138        tb: TimestampBound,
139        options: CallOptions,
140    ) -> Result<BatchReadOnlyTransaction, Status> {
141        let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
142        Ok(BatchReadOnlyTransaction { base_tx: tx })
143    }
144
145    /// partition_read returns a list of Partitions that can be used to read rows from
146    /// the database. These partitions can be executed across multiple processes,
147    /// even across different machines. The partition size and count hints can be
148    /// configured using PartitionOptions.
149    pub async fn partition_read(
150        &mut self,
151        table: &str,
152        columns: &[&str],
153        keys: impl Into<KeySet> + Clone,
154    ) -> Result<Vec<Partition<TableReader>>, Status> {
155        self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
156            .await
157    }
158
159    /// partition_read returns a list of Partitions that can be used to read rows from
160    /// the database. These partitions can be executed across multiple processes,
161    /// even across different machines. The partition size and count hints can be
162    /// configured using PartitionOptions.
163    #[allow(clippy::too_many_arguments)]
164    pub async fn partition_read_with_option(
165        &mut self,
166        table: &str,
167        columns: &[&str],
168        keys: impl Into<KeySet> + Clone,
169        po: Option<PartitionOptions>,
170        ro: ReadOptions,
171        data_boost_enabled: bool,
172        directed_read_options: Option<DirectedReadOptions>,
173    ) -> Result<Vec<Partition<TableReader>>, Status> {
174        let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
175        let inner_keyset = keys.into().inner;
176        let request = PartitionReadRequest {
177            session: self.get_session_name(),
178            transaction: Some(self.transaction_selector.clone()),
179            table: table.to_string(),
180            index: ro.index.clone(),
181            columns: columns.clone(),
182            key_set: Some(inner_keyset.clone()),
183            partition_options: po,
184        };
185        let result = match self
186            .as_mut_session()
187            .spanner_client
188            .partition_read(request, ro.call_options.retry)
189            .await
190        {
191            Ok(r) => Ok(r
192                .into_inner()
193                .partitions
194                .into_iter()
195                .map(|x| Partition {
196                    reader: TableReader {
197                        request: ReadRequest {
198                            session: self.get_session_name(),
199                            transaction: Some(self.transaction_selector.clone()),
200                            table: table.to_string(),
201                            index: ro.index.clone(),
202                            columns: columns.clone(),
203                            key_set: Some(inner_keyset.clone()),
204                            limit: ro.limit,
205                            resume_token: vec![],
206                            partition_token: x.partition_token,
207                            request_options: Transaction::create_request_options(ro.call_options.priority),
208                            directed_read_options: directed_read_options.clone(),
209                            data_boost_enabled,
210                            order_by: 0,
211                            lock_hint: 0,
212                        },
213                    },
214                })
215                .collect()),
216            Err(e) => Err(e),
217        };
218        self.as_mut_session().invalidate_if_needed(result).await
219    }
220
221    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
222    pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
223        self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
224            .await
225    }
226
227    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
228    pub async fn partition_query_with_option(
229        &mut self,
230        stmt: Statement,
231        po: Option<PartitionOptions>,
232        qo: QueryOptions,
233        data_boost_enabled: bool,
234        directed_read_options: Option<DirectedReadOptions>,
235    ) -> Result<Vec<Partition<StatementReader>>, Status> {
236        let request = PartitionQueryRequest {
237            session: self.get_session_name(),
238            transaction: Some(self.transaction_selector.clone()),
239            sql: stmt.sql.clone(),
240            params: Some(prost_types::Struct {
241                fields: stmt.params.clone(),
242            }),
243            param_types: stmt.param_types.clone(),
244            partition_options: po,
245        };
246        let result = match self
247            .as_mut_session()
248            .spanner_client
249            .partition_query(request.clone(), qo.call_options.retry.clone())
250            .await
251        {
252            Ok(r) => Ok(r
253                .into_inner()
254                .partitions
255                .into_iter()
256                .map(|x| Partition {
257                    reader: StatementReader {
258                        enable_resume: qo.enable_resume,
259                        request: ExecuteSqlRequest {
260                            session: self.get_session_name(),
261                            transaction: Some(self.transaction_selector.clone()),
262                            sql: stmt.sql.clone(),
263                            params: Some(prost_types::Struct {
264                                fields: stmt.params.clone(),
265                            }),
266                            param_types: stmt.param_types.clone(),
267                            resume_token: vec![],
268                            query_mode: 0,
269                            partition_token: x.partition_token,
270                            seqno: 0,
271                            query_options: qo.optimizer_options.clone(),
272                            request_options: Transaction::create_request_options(qo.call_options.priority),
273                            data_boost_enabled,
274                            directed_read_options: directed_read_options.clone(),
275                        },
276                    },
277                })
278                .collect()),
279            Err(e) => Err(e),
280        };
281        self.as_mut_session().invalidate_if_needed(result).await
282    }
283
284    /// execute runs a single Partition obtained from partition_read or partition_query.
285    pub async fn execute<T: Reader + Sync + Send + 'static>(
286        &mut self,
287        partition: Partition<T>,
288        option: Option<CallOptions>,
289    ) -> Result<RowIterator<'_, T>, Status> {
290        let session = self.as_mut_session();
291        RowIterator::new(session, partition.reader, option).await
292    }
293}