1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10 execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11 ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22 pub priority: Option<Priority>,
24 pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29 pub index: String,
33 pub limit: i64,
35 pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39 fn default() -> Self {
40 ReadOptions {
41 index: "".to_string(),
42 limit: 0,
43 call_options: CallOptions::default(),
44 }
45 }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50 pub mode: QueryMode,
51 pub optimizer_options: Option<ExecuteQueryOptions>,
52 pub call_options: CallOptions,
53 pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91 fn default() -> Self {
92 QueryOptions {
93 mode: QueryMode::Normal,
94 optimizer_options: None,
95 call_options: CallOptions::default(),
96 enable_resume: true,
97 }
98 }
99}
100
101pub struct Transaction {
102 pub(crate) session: Option<ManagedSession>,
103 pub(crate) sequence_number: AtomicI64,
105 pub(crate) transaction_selector: TransactionSelector,
106 pub(crate) transaction_tag: Option<String>,
108}
109
110impl Transaction {
111 pub(crate) fn create_request_options(
112 priority: Option<Priority>,
113 transaction_tag: Option<String>,
114 ) -> Option<RequestOptions> {
115 if priority.is_none() && transaction_tag.as_ref().map(String::is_empty).unwrap_or(true) {
116 return None;
117 }
118 Some(RequestOptions {
119 priority: priority.unwrap_or_default().into(),
120 request_tag: String::new(),
121 transaction_tag: transaction_tag.unwrap_or_default(),
122 })
123 }
124
125 pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
130 self.query_with_option(statement, QueryOptions::default()).await
131 }
132
133 pub async fn query_with_option(
138 &mut self,
139 statement: Statement,
140 options: QueryOptions,
141 ) -> Result<RowIterator<'_, impl Reader>, Status> {
142 let request = ExecuteSqlRequest {
143 session: self.session.as_ref().unwrap().session.name.to_string(),
144 transaction: Some(self.transaction_selector.clone()),
145 sql: statement.sql,
146 params: Some(Struct {
147 fields: statement.params,
148 }),
149 param_types: statement.param_types,
150 resume_token: vec![],
151 query_mode: options.mode.into(),
152 partition_token: vec![],
153 seqno: 0,
154 query_options: options.optimizer_options,
155 request_options: Transaction::create_request_options(
156 options.call_options.priority,
157 self.transaction_tag.clone(),
158 ),
159 data_boost_enabled: false,
160 directed_read_options: None,
161 last_statement: false,
162 };
163 let session = self.session.as_mut().unwrap().deref_mut();
164 let reader = StatementReader {
165 enable_resume: options.enable_resume,
166 request,
167 };
168 RowIterator::new(session, reader, Some(options.call_options)).await
169 }
170
171 pub async fn read(
192 &mut self,
193 table: &str,
194 columns: &[&str],
195 key_set: impl Into<KeySet>,
196 ) -> Result<RowIterator<'_, impl Reader>, Status> {
197 self.read_with_option(table, columns, key_set, ReadOptions::default())
198 .await
199 }
200
201 pub async fn read_with_option(
203 &mut self,
204 table: &str,
205 columns: &[&str],
206 key_set: impl Into<KeySet>,
207 options: ReadOptions,
208 ) -> Result<RowIterator<'_, impl Reader>, Status> {
209 let request = ReadRequest {
210 session: self.get_session_name(),
211 transaction: Some(self.transaction_selector.clone()),
212 table: table.to_string(),
213 index: options.index,
214 columns: columns.iter().map(|x| x.to_string()).collect(),
215 key_set: Some(key_set.into().inner),
216 limit: options.limit,
217 resume_token: vec![],
218 partition_token: vec![],
219 request_options: Transaction::create_request_options(
220 options.call_options.priority,
221 self.transaction_tag.clone(),
222 ),
223 data_boost_enabled: false,
224 order_by: 0,
225 directed_read_options: None,
226 lock_hint: 0,
227 };
228
229 let session = self.as_mut_session();
230 let reader = TableReader { request };
231 RowIterator::new(session, reader, Some(options.call_options)).await
232 }
233
234 pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
247 self.read_row_with_option(table, columns, key, ReadOptions::default())
248 .await
249 }
250
251 pub async fn read_row_with_option(
253 &mut self,
254 table: &str,
255 columns: &[&str],
256 key: Key,
257 options: ReadOptions,
258 ) -> Result<Option<Row>, Status> {
259 let call_options = options.call_options.clone();
260 let mut reader = self
261 .read_with_option(table, columns, KeySet::from(key), options)
262 .await?;
263 reader.set_call_options(call_options);
264 reader.next().await
265 }
266
267 pub(crate) fn get_session_name(&self) -> String {
268 self.session.as_ref().unwrap().session.name.to_string()
269 }
270
271 pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
272 self.session.as_mut().unwrap()
273 }
274
275 pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
278 self.session.take()
279 }
280}