1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10 execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11 ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22 pub priority: Option<Priority>,
24 pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29 pub index: String,
33 pub limit: i64,
35 pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39 fn default() -> Self {
40 ReadOptions {
41 index: "".to_string(),
42 limit: 0,
43 call_options: CallOptions::default(),
44 }
45 }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50 pub mode: QueryMode,
51 pub optimizer_options: Option<ExecuteQueryOptions>,
52 pub call_options: CallOptions,
53 pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91 fn default() -> Self {
92 QueryOptions {
93 mode: QueryMode::Normal,
94 optimizer_options: None,
95 call_options: CallOptions::default(),
96 enable_resume: true,
97 }
98 }
99}
100
101pub struct Transaction {
102 pub(crate) session: Option<ManagedSession>,
103 pub(crate) sequence_number: AtomicI64,
105 pub(crate) transaction_selector: TransactionSelector,
106 pub(crate) transaction_tag: Option<String>,
108 pub(crate) disable_route_to_leader: bool,
111}
112
113impl Transaction {
114 pub(crate) fn create_request_options(
115 priority: Option<Priority>,
116 transaction_tag: Option<String>,
117 ) -> Option<RequestOptions> {
118 if priority.is_none() && transaction_tag.as_ref().map(String::is_empty).unwrap_or(true) {
119 return None;
120 }
121 Some(RequestOptions {
122 priority: priority.unwrap_or_default().into(),
123 request_tag: String::new(),
124 transaction_tag: transaction_tag.unwrap_or_default(),
125 })
126 }
127
128 pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
133 self.query_with_option(statement, QueryOptions::default()).await
134 }
135
136 pub async fn query_with_option(
139 &mut self,
140 statement: Statement,
141 options: QueryOptions,
142 ) -> Result<RowIterator<'_, impl Reader>, Status> {
143 let request = ExecuteSqlRequest {
144 session: self.session.as_ref().unwrap().session.name.to_string(),
145 transaction: Some(self.transaction_selector.clone()),
146 sql: statement.sql,
147 params: Some(Struct {
148 fields: statement.params,
149 }),
150 param_types: statement.param_types,
151 resume_token: vec![],
152 query_mode: options.mode.into(),
153 partition_token: vec![],
154 seqno: 0,
155 query_options: options.optimizer_options,
156 request_options: Transaction::create_request_options(
157 options.call_options.priority,
158 self.transaction_tag.clone(),
159 ),
160 data_boost_enabled: false,
161 directed_read_options: None,
162 last_statement: false,
163 };
164 let session = self.session.as_mut().unwrap().deref_mut();
165 let reader = StatementReader {
166 enable_resume: options.enable_resume,
167 request,
168 };
169 RowIterator::new(session, reader, Some(options.call_options), self.disable_route_to_leader).await
170 }
171
172 pub async fn read(
193 &mut self,
194 table: &str,
195 columns: &[&str],
196 key_set: impl Into<KeySet>,
197 ) -> Result<RowIterator<'_, impl Reader>, Status> {
198 self.read_with_option(table, columns, key_set, ReadOptions::default())
199 .await
200 }
201
202 pub async fn read_with_option(
204 &mut self,
205 table: &str,
206 columns: &[&str],
207 key_set: impl Into<KeySet>,
208 options: ReadOptions,
209 ) -> Result<RowIterator<'_, impl Reader>, Status> {
210 let request = ReadRequest {
211 session: self.get_session_name(),
212 transaction: Some(self.transaction_selector.clone()),
213 table: table.to_string(),
214 index: options.index,
215 columns: columns.iter().map(|x| x.to_string()).collect(),
216 key_set: Some(key_set.into().inner),
217 limit: options.limit,
218 resume_token: vec![],
219 partition_token: vec![],
220 request_options: Transaction::create_request_options(
221 options.call_options.priority,
222 self.transaction_tag.clone(),
223 ),
224 data_boost_enabled: false,
225 order_by: 0,
226 directed_read_options: None,
227 lock_hint: 0,
228 };
229
230 let disable_route_to_leader = self.disable_route_to_leader;
231 let session = self.as_mut_session();
232 let reader = TableReader { request };
233 RowIterator::new(session, reader, Some(options.call_options), disable_route_to_leader).await
234 }
235
236 pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
249 self.read_row_with_option(table, columns, key, ReadOptions::default())
250 .await
251 }
252
253 pub async fn read_row_with_option(
255 &mut self,
256 table: &str,
257 columns: &[&str],
258 key: Key,
259 options: ReadOptions,
260 ) -> Result<Option<Row>, Status> {
261 let call_options = options.call_options.clone();
262 let mut reader = self
263 .read_with_option(table, columns, KeySet::from(key), options)
264 .await?;
265 reader.set_call_options(call_options);
266 reader.next().await
267 }
268
269 pub(crate) fn get_session_name(&self) -> String {
270 self.session.as_ref().unwrap().session.name.to_string()
271 }
272
273 pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
274 self.session.as_mut().unwrap()
275 }
276
277 pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
280 self.session.take()
281 }
282}