1use std::ops::{Deref, DerefMut};
2use std::sync::atomic::AtomicI64;
3use std::time::SystemTime;
4
5use time::OffsetDateTime;
6
7use crate::key::KeySet;
8use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
9use crate::session::ManagedSession;
10use crate::statement::Statement;
11use crate::transaction::{CallOptions, QueryOptions, ReadOptions, Transaction};
12use crate::value::TimestampBound;
13use google_cloud_gax::grpc::Status;
14use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
15use google_cloud_googleapis::spanner::v1::{
16 transaction_options, transaction_selector, BeginTransactionRequest, DirectedReadOptions, ExecuteSqlRequest,
17 PartitionOptions, PartitionQueryRequest, PartitionReadRequest, ReadRequest, TransactionOptions,
18 TransactionSelector,
19};
20
21pub struct ReadOnlyTransaction {
36 base_tx: Transaction,
37 pub rts: Option<OffsetDateTime>,
38}
39
40impl Deref for ReadOnlyTransaction {
41 type Target = Transaction;
42
43 fn deref(&self) -> &Self::Target {
44 &self.base_tx
45 }
46}
47
48impl DerefMut for ReadOnlyTransaction {
49 fn deref_mut(&mut self) -> &mut Self::Target {
50 &mut self.base_tx
51 }
52}
53
54impl ReadOnlyTransaction {
55 pub async fn single(session: ManagedSession, tb: TimestampBound) -> Result<ReadOnlyTransaction, Status> {
56 Ok(ReadOnlyTransaction {
57 base_tx: Transaction {
58 session: Some(session),
59 sequence_number: AtomicI64::new(0),
60 transaction_selector: TransactionSelector {
61 selector: Some(transaction_selector::Selector::SingleUse(TransactionOptions {
62 exclude_txn_from_change_streams: false,
63 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
64 isolation_level: IsolationLevel::Unspecified as i32,
65 })),
66 },
67 transaction_tag: None,
68 disable_route_to_leader: true,
69 },
70 rts: None,
71 })
72 }
73
74 pub async fn begin(
76 mut session: ManagedSession,
77 tb: TimestampBound,
78 options: CallOptions,
79 ) -> Result<ReadOnlyTransaction, Status> {
80 let request = BeginTransactionRequest {
81 session: session.session.name.to_string(),
82 options: Some(TransactionOptions {
83 exclude_txn_from_change_streams: false,
84 mode: Some(transaction_options::Mode::ReadOnly(tb.into())),
85 isolation_level: IsolationLevel::Unspecified as i32,
86 }),
87 request_options: Transaction::create_request_options(options.priority, None),
88 mutation_key: None,
89 };
90
91 let result = session
92 .spanner_client
93 .begin_transaction(request, true, options.retry)
94 .await;
95 match session.invalidate_if_needed(result).await {
96 Ok(response) => {
97 let tx = response.into_inner();
98 let rts = tx.read_timestamp.unwrap();
99 let st: SystemTime = rts.try_into().unwrap();
100 Ok(ReadOnlyTransaction {
101 base_tx: Transaction {
102 session: Some(session),
103 sequence_number: AtomicI64::new(0),
104 transaction_selector: TransactionSelector {
105 selector: Some(transaction_selector::Selector::Id(tx.id)),
106 },
107 transaction_tag: None,
108 disable_route_to_leader: true,
109 },
110 rts: Some(OffsetDateTime::from(st)),
111 })
112 }
113 Err(e) => Err(e),
114 }
115 }
116}
117
118pub struct Partition<T: Reader> {
119 pub reader: T,
120}
121
122pub struct BatchReadOnlyTransaction {
128 base_tx: ReadOnlyTransaction,
129}
130
131impl Deref for BatchReadOnlyTransaction {
132 type Target = ReadOnlyTransaction;
133
134 fn deref(&self) -> &Self::Target {
135 &self.base_tx
136 }
137}
138
139impl DerefMut for BatchReadOnlyTransaction {
140 fn deref_mut(&mut self) -> &mut Self::Target {
141 &mut self.base_tx
142 }
143}
144
145impl BatchReadOnlyTransaction {
146 pub async fn begin(
147 session: ManagedSession,
148 tb: TimestampBound,
149 options: CallOptions,
150 ) -> Result<BatchReadOnlyTransaction, Status> {
151 let tx = ReadOnlyTransaction::begin(session, tb, options).await?;
152 Ok(BatchReadOnlyTransaction { base_tx: tx })
153 }
154
155 pub async fn partition_read(
160 &mut self,
161 table: &str,
162 columns: &[&str],
163 keys: impl Into<KeySet> + Clone,
164 ) -> Result<Vec<Partition<TableReader>>, Status> {
165 self.partition_read_with_option(table, columns, keys, None, ReadOptions::default(), false, None)
166 .await
167 }
168
169 #[allow(clippy::too_many_arguments)]
174 pub async fn partition_read_with_option(
175 &mut self,
176 table: &str,
177 columns: &[&str],
178 keys: impl Into<KeySet> + Clone,
179 po: Option<PartitionOptions>,
180 ro: ReadOptions,
181 data_boost_enabled: bool,
182 directed_read_options: Option<DirectedReadOptions>,
183 ) -> Result<Vec<Partition<TableReader>>, Status> {
184 let columns: Vec<String> = columns.iter().map(|x| x.to_string()).collect();
185 let inner_keyset = keys.into().inner;
186 let request = PartitionReadRequest {
187 session: self.get_session_name(),
188 transaction: Some(self.transaction_selector.clone()),
189 table: table.to_string(),
190 index: ro.index.clone(),
191 columns: columns.clone(),
192 key_set: Some(inner_keyset.clone()),
193 partition_options: po,
194 };
195 let disable_route_to_leader = self.disable_route_to_leader;
196 let result = match self
197 .as_mut_session()
198 .spanner_client
199 .partition_read(request, disable_route_to_leader, ro.call_options.retry)
200 .await
201 {
202 Ok(r) => Ok(r
203 .into_inner()
204 .partitions
205 .into_iter()
206 .map(|x| Partition {
207 reader: TableReader {
208 request: ReadRequest {
209 session: self.get_session_name(),
210 transaction: Some(self.transaction_selector.clone()),
211 table: table.to_string(),
212 index: ro.index.clone(),
213 columns: columns.clone(),
214 key_set: Some(inner_keyset.clone()),
215 limit: ro.limit,
216 resume_token: vec![],
217 partition_token: x.partition_token,
218 request_options: Transaction::create_request_options(
219 ro.call_options.priority,
220 self.base_tx.transaction_tag.clone(),
221 ),
222 directed_read_options: directed_read_options.clone(),
223 data_boost_enabled,
224 order_by: 0,
225 lock_hint: 0,
226 },
227 },
228 })
229 .collect()),
230 Err(e) => Err(e),
231 };
232 self.as_mut_session().invalidate_if_needed(result).await
233 }
234
235 pub async fn partition_query(&mut self, stmt: Statement) -> Result<Vec<Partition<StatementReader>>, Status> {
237 self.partition_query_with_option(stmt, None, QueryOptions::default(), false, None)
238 .await
239 }
240
241 pub async fn partition_query_with_option(
243 &mut self,
244 stmt: Statement,
245 po: Option<PartitionOptions>,
246 qo: QueryOptions,
247 data_boost_enabled: bool,
248 directed_read_options: Option<DirectedReadOptions>,
249 ) -> Result<Vec<Partition<StatementReader>>, Status> {
250 let request = PartitionQueryRequest {
251 session: self.get_session_name(),
252 transaction: Some(self.transaction_selector.clone()),
253 sql: stmt.sql.clone(),
254 params: Some(prost_types::Struct {
255 fields: stmt.params.clone(),
256 }),
257 param_types: stmt.param_types.clone(),
258 partition_options: po,
259 };
260 let disable_route_to_leader = self.disable_route_to_leader;
261 let result = match self
262 .as_mut_session()
263 .spanner_client
264 .partition_query(request.clone(), disable_route_to_leader, qo.call_options.retry.clone())
265 .await
266 {
267 Ok(r) => Ok(r
268 .into_inner()
269 .partitions
270 .into_iter()
271 .map(|x| Partition {
272 reader: StatementReader {
273 enable_resume: qo.enable_resume,
274 request: ExecuteSqlRequest {
275 session: self.get_session_name(),
276 transaction: Some(self.transaction_selector.clone()),
277 sql: stmt.sql.clone(),
278 params: Some(prost_types::Struct {
279 fields: stmt.params.clone(),
280 }),
281 param_types: stmt.param_types.clone(),
282 resume_token: vec![],
283 query_mode: 0,
284 partition_token: x.partition_token,
285 seqno: 0,
286 query_options: qo.optimizer_options.clone(),
287 request_options: Transaction::create_request_options(
288 qo.call_options.priority,
289 self.base_tx.transaction_tag.clone(),
290 ),
291 data_boost_enabled,
292 directed_read_options: directed_read_options.clone(),
293 last_statement: false,
294 },
295 },
296 })
297 .collect()),
298 Err(e) => Err(e),
299 };
300 self.as_mut_session().invalidate_if_needed(result).await
301 }
302
303 pub async fn execute<T: Reader + Sync + Send + 'static>(
305 &mut self,
306 partition: Partition<T>,
307 option: Option<CallOptions>,
308 ) -> Result<RowIterator<'_, T>, Status> {
309 let disable_route_to_leader = self.disable_route_to_leader;
310 let session = self.as_mut_session();
311 RowIterator::new(session, partition.reader, option, disable_route_to_leader).await
312 }
313}