1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use crate::key::KeySet;
8use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
9use crate::session::ManagedSession;
10use crate::statement::Statement;
11use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
12use crate::value::TimestampBound;
13use google_cloud_gax::grpc::Status;
14use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
15use google_cloud_googleapis::spanner::v1::{
16 transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
17 PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
18 TransactionSelector,
19};
20
21pub struct ReadOnlyTransaction {
36 base_tx: Transaction,
37 pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41 type Target = Transaction;
42
43 fn deref(&self) -> &Self::Target {
44 &self.base_tx
45 }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49 fn deref_mut(&mut self) -> &mut Self::Target {
50 &mut self.base_tx
51 }
52}
53
54impl ReadOnlyTransaction {
55 pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56 Ok(ReadOnlyTransaction {
57 base_tx: Transaction {
58 session: Some(session),
59 sequence_number: AtomicI64::new(0),
60 transaction_selector: TransactionSelector {
61 selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62 exclude_txn_from_change_streams: false,
63 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64 isolation_level: IsolationLevel::Unspecified as i32,
65 })),
66 },
67 transaction_tag: None,
68 },
69 rts: None,
70 })
71 }
72
73 pub async fn begin(
75 mut session: ManagedSession,
76 tb: TimestampBound,
77 options: CallOptions,
78 ) -> Result<ReadOnlyTransaction, Status> {
79 let request = BeginTransactionRequest {
80 session: session.session.name.to_string(),
81 options: Some(TransactionOptions {
82 exclude_txn_from_change_streams: false,
83 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
84 isolation_level: IsolationLevel::Unspecified as i32,
85 }),
86 request_options: Transaction::create_request_options(options.priority, None),
87 mutation_key: None,
88 };
89
90 let result = session.spanner_client.begin_transaction(request, options.retry).await;
91 match session.invalidate_if_needed(result).await {
92 Ok(response) => {
93 let tx = response.into_inner();
94 let rts = tx.read_timestamp.unwrap();
95 let st: SystemTime = rts.try_into().unwrap();
96 Ok(ReadOnlyTransaction {
97 base_tx: Transaction {
98 session: Some(session),
99 sequence_number: AtomicI64::new(0),
100 transaction_selector: TransactionSelector {
101 selector: Some(transaction_selector::Selector::Id(tx.id)),
102 },
103 transaction_tag: None,
104 },
105 rts: Some(OffsetDateTime::from(st)),
106 })
107 }
108 Err(e) => Err(e),
109 }
110 }
111}
112
113pub struct Partition<T: Reader> {
114 pub reader: T,
115}
116
117pub struct BatchReadOnlyTransaction {
123 base_tx: ReadOnlyTransaction,
124}
125
126impl Deref for BatchReadOnlyTransaction {
127 type Target = ReadOnlyTransaction;
128
129 fn deref(&self) -> &Self::Target {
130 &self.base_tx
131 }
132}
133
134impl DerefMut for BatchReadOnlyTransaction {
135 fn deref_mut(&mut self) -> &mut Self::Target {
136 &mut self.base_tx
137 }
138}
139
140impl BatchReadOnlyTransaction {
141 pub async fn begin(
142 session: ManagedSession,
143 tb: TimestampBound,
144 options: CallOptions,
145 ) -> Result<BatchReadOnlyTransaction, Status> {
146 let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
147 Ok(BatchReadOnlyTransaction { base_tx: tx })
148 }
149
150 pub async fn partition_read(
155 &mut self,
156 table: &str,
157 columns: &[&str],
158 keys: impl Into<KeySet> + Clone,
159 ) -> Result<Vec<Partition<TableReader>>, Status> {
160 self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
161 .await
162 }
163
164 #[allow(clippy::too_many_arguments)]
169 pub async fn partition_read_with_option(
170 &mut self,
171 table: &str,
172 columns: &[&str],
173 keys: impl Into<KeySet> + Clone,
174 po: Option<PartitionOptions>,
175 ro: ReadOptions,
176 data_boost_enabled: bool,
177 directed_read_options: Option<DirectedReadOptions>,
178 ) -> Result<Vec<Partition<TableReader>>, Status> {
179 let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
180 let inner_keyset = keys.into().inner;
181 let request = PartitionReadRequest {
182 session: self.get_session_name(),
183 transaction: Some(self.transaction_selector.clone()),
184 table: table.to_string(),
185 index: ro.index.clone(),
186 columns: columns.clone(),
187 key_set: Some(inner_keyset.clone()),
188 partition_options: po,
189 };
190 let result = match self
191 .as_mut_session()
192 .spanner_client
193 .partition_read(request, ro.call_options.retry)
194 .await
195 {
196 Ok(r) => Ok(r
197 .into_inner()
198 .partitions
199 .into_iter()
200 .map(|x| Partition {
201 reader: TableReader {
202 request: ReadRequest {
203 session: self.get_session_name(),
204 transaction: Some(self.transaction_selector.clone()),
205 table: table.to_string(),
206 index: ro.index.clone(),
207 columns: columns.clone(),
208 key_set: Some(inner_keyset.clone()),
209 limit: ro.limit,
210 resume_token: vec![],
211 partition_token: x.partition_token,
212 request_options: Transaction::create_request_options(
213 ro.call_options.priority,
214 self.base_tx.transaction_tag.clone(),
215 ),
216 directed_read_options: directed_read_options.clone(),
217 data_boost_enabled,
218 order_by: 0,
219 lock_hint: 0,
220 },
221 },
222 })
223 .collect()),
224 Err(e) => Err(e),
225 };
226 self.as_mut_session().invalidate_if_needed(result).await
227 }
228
229 pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
231 self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
232 .await
233 }
234
235 pub async fn partition_query_with_option(
237 &mut self,
238 stmt: Statement,
239 po: Option<PartitionOptions>,
240 qo: QueryOptions,
241 data_boost_enabled: bool,
242 directed_read_options: Option<DirectedReadOptions>,
243 ) -> Result<Vec<Partition<StatementReader>>, Status> {
244 let request = PartitionQueryRequest {
245 session: self.get_session_name(),
246 transaction: Some(self.transaction_selector.clone()),
247 sql: stmt.sql.clone(),
248 params: Some(prost_types::Struct {
249 fields: stmt.params.clone(),
250 }),
251 param_types: stmt.param_types.clone(),
252 partition_options: po,
253 };
254 let result = match self
255 .as_mut_session()
256 .spanner_client
257 .partition_query(request.clone(), qo.call_options.retry.clone())
258 .await
259 {
260 Ok(r) => Ok(r
261 .into_inner()
262 .partitions
263 .into_iter()
264 .map(|x| Partition {
265 reader: StatementReader {
266 enable_resume: qo.enable_resume,
267 request: ExecuteSqlRequest {
268 session: self.get_session_name(),
269 transaction: Some(self.transaction_selector.clone()),
270 sql: stmt.sql.clone(),
271 params: Some(prost_types::Struct {
272 fields: stmt.params.clone(),
273 }),
274 param_types: stmt.param_types.clone(),
275 resume_token: vec![],
276 query_mode: 0,
277 partition_token: x.partition_token,
278 seqno: 0,
279 query_options: qo.optimizer_options.clone(),
280 request_options: Transaction::create_request_options(
281 qo.call_options.priority,
282 self.base_tx.transaction_tag.clone(),
283 ),
284 data_boost_enabled,
285 directed_read_options: directed_read_options.clone(),
286 last_statement: false,
287 },
288 },
289 })
290 .collect()),
291 Err(e) => Err(e),
292 };
293 self.as_mut_session().invalidate_if_needed(result).await
294 }
295
296 pub async fn execute<T: Reader + Sync + Send + 'static>(
298 &mut self,
299 partition: Partition<T>,
300 option: Option<CallOptions>,
301 ) -> Result<RowIterator<'_, T>, Status> {
302 let session = self.as_mut_session();
303 RowIterator::new(session, partition.reader, option).await
304 }
305}