gcloud_spanner/
transaction_ro.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use crate::key::KeySet;
8use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
9use crate::session::ManagedSession;
10use crate::statement::Statement;
11use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
12use crate::value::TimestampBound;
13use google_cloud_gax::grpc::Status;
14use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
15use google_cloud_googleapis::spanner::v1::{
16    transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
17    PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
18    TransactionSelector,
19};
20
21/// ReadOnlyTransaction provides a snapshot transaction with guaranteed
22/// consistency across reads, but does not allow writes.  Read-only transactions
23/// can be configured to read at timestamps in the past.
24///
25/// Read-only transactions do not take locks. Instead, they work by choosing a
26/// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
27/// they do not acquire locks, they do not block concurrent read-write
28/// transactions.
29///
30/// Unlike locking read-write transactions, read-only transactions never abort.
31/// They can fail if the chosen read timestamp is garbage collected; however, the
32/// default garbage collection policy is generous enough that most applications
33/// do not need to worry about this in practice. See the documentation of
34/// TimestampBound for more details.
35pub struct ReadOnlyTransaction {
36    base_tx: Transaction,
37    pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41    type Target = Transaction;
42
43    fn deref(&self) -> &Self::Target {
44        &self.base_tx
45    }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49    fn deref_mut(&mut self) -> &mut Self::Target {
50        &mut self.base_tx
51    }
52}
53
54impl ReadOnlyTransaction {
55    pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56        Ok(ReadOnlyTransaction {
57            base_tx: Transaction {
58                session: Some(session),
59                sequence_number: AtomicI64::new(0),
60                transaction_selector: TransactionSelector {
61                    selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62                        exclude_txn_from_change_streams: false,
63                        mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64                        isolation_level: IsolationLevel::Unspecified as i32,
65                    })),
66                },
67                transaction_tag: None,
68                disable_route_to_leader: true,
69            },
70            rts: None,
71        })
72    }
73
74    /// begin starts a snapshot read-only Transaction on Cloud Spanner.
75    pub async fn begin(
76        mut session: ManagedSession,
77        tb: TimestampBound,
78        options: CallOptions,
79    ) -> Result<ReadOnlyTransaction, Status> {
80        let request = BeginTransactionRequest {
81            session: session.session.name.to_string(),
82            options: Some(TransactionOptions {
83                exclude_txn_from_change_streams: false,
84                mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
85                isolation_level: IsolationLevel::Unspecified as i32,
86            }),
87            request_options: Transaction::create_request_options(options.priority, None),
88            mutation_key: None,
89        };
90
91        let result = session
92            .spanner_client
93            .begin_transaction(request, true, options.retry)
94            .await;
95        match session.invalidate_if_needed(result).await {
96            Ok(response) => {
97                let tx = response.into_inner();
98                let rts = tx.read_timestamp.unwrap();
99                let st: SystemTime = rts.try_into().unwrap();
100                Ok(ReadOnlyTransaction {
101                    base_tx: Transaction {
102                        session: Some(session),
103                        sequence_number: AtomicI64::new(0),
104                        transaction_selector: TransactionSelector {
105                            selector: Some(transaction_selector::Selector::Id(tx.id)),
106                        },
107                        transaction_tag: None,
108                        disable_route_to_leader: true,
109                    },
110                    rts: Some(OffsetDateTime::from(st)),
111                })
112            }
113            Err(e) => Err(e),
114        }
115    }
116}
117
118pub struct Partition<T: Reader> {
119    pub reader: T,
120}
121
122/// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
123/// arbitrarily large amounts of data from Cloud Spanner databases.
124/// BatchReadOnlyTransaction partitions a read/query request. Read/query request
125/// can then be executed independently over each partition while observing the
126/// same snapshot of the database.
127pub struct BatchReadOnlyTransaction {
128    base_tx: ReadOnlyTransaction,
129}
130
131impl Deref for BatchReadOnlyTransaction {
132    type Target = ReadOnlyTransaction;
133
134    fn deref(&self) -> &Self::Target {
135        &self.base_tx
136    }
137}
138
139impl DerefMut for BatchReadOnlyTransaction {
140    fn deref_mut(&mut self) -> &mut Self::Target {
141        &mut self.base_tx
142    }
143}
144
145impl BatchReadOnlyTransaction {
146    pub async fn begin(
147        session: ManagedSession,
148        tb: TimestampBound,
149        options: CallOptions,
150    ) -> Result<BatchReadOnlyTransaction, Status> {
151        let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
152        Ok(BatchReadOnlyTransaction { base_tx: tx })
153    }
154
155    /// partition_read returns a list of Partitions that can be used to read rows from
156    /// the database. These partitions can be executed across multiple processes,
157    /// even across different machines. The partition size and count hints can be
158    /// configured using PartitionOptions.
159    pub async fn partition_read(
160        &mut self,
161        table: &str,
162        columns: &[&str],
163        keys: impl Into<KeySet> + Clone,
164    ) -> Result<Vec<Partition<TableReader>>, Status> {
165        self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
166            .await
167    }
168
169    /// partition_read returns a list of Partitions that can be used to read rows from
170    /// the database. These partitions can be executed across multiple processes,
171    /// even across different machines. The partition size and count hints can be
172    /// configured using PartitionOptions.
173    #[allow(clippy::too_many_arguments)]
174    pub async fn partition_read_with_option(
175        &mut self,
176        table: &str,
177        columns: &[&str],
178        keys: impl Into<KeySet> + Clone,
179        po: Option<PartitionOptions>,
180        ro: ReadOptions,
181        data_boost_enabled: bool,
182        directed_read_options: Option<DirectedReadOptions>,
183    ) -> Result<Vec<Partition<TableReader>>, Status> {
184        let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
185        let inner_keyset = keys.into().inner;
186        let request = PartitionReadRequest {
187            session: self.get_session_name(),
188            transaction: Some(self.transaction_selector.clone()),
189            table: table.to_string(),
190            index: ro.index.clone(),
191            columns: columns.clone(),
192            key_set: Some(inner_keyset.clone()),
193            partition_options: po,
194        };
195        let disable_route_to_leader = self.disable_route_to_leader;
196        let result = match self
197            .as_mut_session()
198            .spanner_client
199            .partition_read(request, disable_route_to_leader, ro.call_options.retry)
200            .await
201        {
202            Ok(r) => Ok(r
203                .into_inner()
204                .partitions
205                .into_iter()
206                .map(|x| Partition {
207                    reader: TableReader {
208                        request: ReadRequest {
209                            session: self.get_session_name(),
210                            transaction: Some(self.transaction_selector.clone()),
211                            table: table.to_string(),
212                            index: ro.index.clone(),
213                            columns: columns.clone(),
214                            key_set: Some(inner_keyset.clone()),
215                            limit: ro.limit,
216                            resume_token: vec![],
217                            partition_token: x.partition_token,
218                            request_options: Transaction::create_request_options(
219                                ro.call_options.priority,
220                                self.base_tx.transaction_tag.clone(),
221                            ),
222                            directed_read_options: directed_read_options.clone(),
223                            data_boost_enabled,
224                            order_by: 0,
225                            lock_hint: 0,
226                        },
227                    },
228                })
229                .collect()),
230            Err(e) => Err(e),
231        };
232        self.as_mut_session().invalidate_if_needed(result).await
233    }
234
235    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
236    pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
237        self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
238            .await
239    }
240
241    /// partition_query returns a list of Partitions that can be used to execute a query against the database.
242    pub async fn partition_query_with_option(
243        &mut self,
244        stmt: Statement,
245        po: Option<PartitionOptions>,
246        qo: QueryOptions,
247        data_boost_enabled: bool,
248        directed_read_options: Option<DirectedReadOptions>,
249    ) -> Result<Vec<Partition<StatementReader>>, Status> {
250        let request = PartitionQueryRequest {
251            session: self.get_session_name(),
252            transaction: Some(self.transaction_selector.clone()),
253            sql: stmt.sql.clone(),
254            params: Some(prost_types::Struct {
255                fields: stmt.params.clone(),
256            }),
257            param_types: stmt.param_types.clone(),
258            partition_options: po,
259        };
260        let disable_route_to_leader = self.disable_route_to_leader;
261        let result = match self
262            .as_mut_session()
263            .spanner_client
264            .partition_query(request.clone(), disable_route_to_leader, qo.call_options.retry.clone())
265            .await
266        {
267            Ok(r) => Ok(r
268                .into_inner()
269                .partitions
270                .into_iter()
271                .map(|x| Partition {
272                    reader: StatementReader {
273                        enable_resume: qo.enable_resume,
274                        request: ExecuteSqlRequest {
275                            session: self.get_session_name(),
276                            transaction: Some(self.transaction_selector.clone()),
277                            sql: stmt.sql.clone(),
278                            params: Some(prost_types::Struct {
279                                fields: stmt.params.clone(),
280                            }),
281                            param_types: stmt.param_types.clone(),
282                            resume_token: vec![],
283                            query_mode: 0,
284                            partition_token: x.partition_token,
285                            seqno: 0,
286                            query_options: qo.optimizer_options.clone(),
287                            request_options: Transaction::create_request_options(
288                                qo.call_options.priority,
289                                self.base_tx.transaction_tag.clone(),
290                            ),
291                            data_boost_enabled,
292                            directed_read_options: directed_read_options.clone(),
293                            last_statement: false,
294                        },
295                    },
296                })
297                .collect()),
298            Err(e) => Err(e),
299        };
300        self.as_mut_session().invalidate_if_needed(result).await
301    }
302
303    /// execute runs a single Partition obtained from partition_read or partition_query.
304    pub async fn execute<T: Reader + Sync + Send + 'static>(
305        &mut self,
306        partition: Partition<T>,
307        option: Option<CallOptions>,
308    ) -> Result<RowIterator<'_, T>, Status> {
309        let disable_route_to_leader = self.disable_route_to_leader;
310        let session = self.as_mut_session();
311        RowIterator::new(session, partition.reader, option, disable_route_to_leader).await
312    }
313}