1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use google_cloud_gax::grpc::Status;
8use google_cloud_googleapis::spanner::v1::{
9 transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
10 PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
11 TransactionSelector,
12};
13
14use crate::key::KeySet;
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::session::ManagedSession;
17use crate::statement::Statement;
18use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
19use crate::value::TimestampBound;
20
21pub struct ReadOnlyTransaction {
36 base_tx: Transaction,
37 pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41 type Target = Transaction;
42
43 fn deref(&self) -> &Self::Target {
44 &self.base_tx
45 }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49 fn deref_mut(&mut self) -> &mut Self::Target {
50 &mut self.base_tx
51 }
52}
53
54impl ReadOnlyTransaction {
55 pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56 Ok(ReadOnlyTransaction {
57 base_tx: Transaction {
58 session: Some(session),
59 sequence_number: AtomicI64::new(0),
60 transaction_selector: TransactionSelector {
61 selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62 exclude_txn_from_change_streams: false,
63 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64 })),
65 },
66 },
67 rts: None,
68 })
69 }
70
71 pub async fn begin(
73 mut session: ManagedSession,
74 tb: TimestampBound,
75 options: CallOptions,
76 ) -> Result<ReadOnlyTransaction, Status> {
77 let request = BeginTransactionRequest {
78 session: session.session.name.to_string(),
79 options: Some(TransactionOptions {
80 exclude_txn_from_change_streams: false,
81 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
82 }),
83 request_options: Transaction::create_request_options(options.priority),
84 };
85
86 let result = session.spanner_client.begin_transaction(request, options.retry).await;
87 match session.invalidate_if_needed(result).await {
88 Ok(response) => {
89 let tx = response.into_inner();
90 let rts = tx.read_timestamp.unwrap();
91 let st: SystemTime = rts.try_into().unwrap();
92 Ok(ReadOnlyTransaction {
93 base_tx: Transaction {
94 session: Some(session),
95 sequence_number: AtomicI64::new(0),
96 transaction_selector: TransactionSelector {
97 selector: Some(transaction_selector::Selector::Id(tx.id)),
98 },
99 },
100 rts: Some(OffsetDateTime::from(st)),
101 })
102 }
103 Err(e) => Err(e),
104 }
105 }
106}
107
108pub struct Partition<T: Reader> {
109 pub reader: T,
110}
111
112pub struct BatchReadOnlyTransaction {
118 base_tx: ReadOnlyTransaction,
119}
120
121impl Deref for BatchReadOnlyTransaction {
122 type Target = ReadOnlyTransaction;
123
124 fn deref(&self) -> &Self::Target {
125 &self.base_tx
126 }
127}
128
129impl DerefMut for BatchReadOnlyTransaction {
130 fn deref_mut(&mut self) -> &mut Self::Target {
131 &mut self.base_tx
132 }
133}
134
135impl BatchReadOnlyTransaction {
136 pub async fn begin(
137 session: ManagedSession,
138 tb: TimestampBound,
139 options: CallOptions,
140 ) -> Result<BatchReadOnlyTransaction, Status> {
141 let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
142 Ok(BatchReadOnlyTransaction { base_tx: tx })
143 }
144
145 pub async fn partition_read(
150 &mut self,
151 table: &str,
152 columns: &[&str],
153 keys: impl Into<KeySet> + Clone,
154 ) -> Result<Vec<Partition<TableReader>>, Status> {
155 self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
156 .await
157 }
158
159 #[allow(clippy::too_many_arguments)]
164 pub async fn partition_read_with_option(
165 &mut self,
166 table: &str,
167 columns: &[&str],
168 keys: impl Into<KeySet> + Clone,
169 po: Option<PartitionOptions>,
170 ro: ReadOptions,
171 data_boost_enabled: bool,
172 directed_read_options: Option<DirectedReadOptions>,
173 ) -> Result<Vec<Partition<TableReader>>, Status> {
174 let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
175 let inner_keyset = keys.into().inner;
176 let request = PartitionReadRequest {
177 session: self.get_session_name(),
178 transaction: Some(self.transaction_selector.clone()),
179 table: table.to_string(),
180 index: ro.index.clone(),
181 columns: columns.clone(),
182 key_set: Some(inner_keyset.clone()),
183 partition_options: po,
184 };
185 let result = match self
186 .as_mut_session()
187 .spanner_client
188 .partition_read(request, ro.call_options.retry)
189 .await
190 {
191 Ok(r) => Ok(r
192 .into_inner()
193 .partitions
194 .into_iter()
195 .map(|x| Partition {
196 reader: TableReader {
197 request: ReadRequest {
198 session: self.get_session_name(),
199 transaction: Some(self.transaction_selector.clone()),
200 table: table.to_string(),
201 index: ro.index.clone(),
202 columns: columns.clone(),
203 key_set: Some(inner_keyset.clone()),
204 limit: ro.limit,
205 resume_token: vec![],
206 partition_token: x.partition_token,
207 request_options: Transaction::create_request_options(ro.call_options.priority),
208 directed_read_options: directed_read_options.clone(),
209 data_boost_enabled,
210 order_by: 0,
211 lock_hint: 0,
212 },
213 },
214 })
215 .collect()),
216 Err(e) => Err(e),
217 };
218 self.as_mut_session().invalidate_if_needed(result).await
219 }
220
221 pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
223 self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
224 .await
225 }
226
227 pub async fn partition_query_with_option(
229 &mut self,
230 stmt: Statement,
231 po: Option<PartitionOptions>,
232 qo: QueryOptions,
233 data_boost_enabled: bool,
234 directed_read_options: Option<DirectedReadOptions>,
235 ) -> Result<Vec<Partition<StatementReader>>, Status> {
236 let request = PartitionQueryRequest {
237 session: self.get_session_name(),
238 transaction: Some(self.transaction_selector.clone()),
239 sql: stmt.sql.clone(),
240 params: Some(prost_types::Struct {
241 fields: stmt.params.clone(),
242 }),
243 param_types: stmt.param_types.clone(),
244 partition_options: po,
245 };
246 let result = match self
247 .as_mut_session()
248 .spanner_client
249 .partition_query(request.clone(), qo.call_options.retry.clone())
250 .await
251 {
252 Ok(r) => Ok(r
253 .into_inner()
254 .partitions
255 .into_iter()
256 .map(|x| Partition {
257 reader: StatementReader {
258 enable_resume: qo.enable_resume,
259 request: ExecuteSqlRequest {
260 session: self.get_session_name(),
261 transaction: Some(self.transaction_selector.clone()),
262 sql: stmt.sql.clone(),
263 params: Some(prost_types::Struct {
264 fields: stmt.params.clone(),
265 }),
266 param_types: stmt.param_types.clone(),
267 resume_token: vec![],
268 query_mode: 0,
269 partition_token: x.partition_token,
270 seqno: 0,
271 query_options: qo.optimizer_options.clone(),
272 request_options: Transaction::create_request_options(qo.call_options.priority),
273 data_boost_enabled,
274 directed_read_options: directed_read_options.clone(),
275 },
276 },
277 })
278 .collect()),
279 Err(e) => Err(e),
280 };
281 self.as_mut_session().invalidate_if_needed(result).await
282 }
283
284 pub async fn execute<T: Reader + Sync + Send + 'static>(
286 &mut self,
287 partition: Partition<T>,
288 option: Option<CallOptions>,
289 ) -> Result<RowIterator<'_, T>, Status> {
290 let session = self.as_mut_session();
291 RowIterator::new(session, partition.reader, option).await
292 }
293}