1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10 execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11 ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22 pub priority: Option<Priority>,
24 pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29 pub index: String,
33 pub limit: i64,
35 pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39 fn default() -> Self {
40 ReadOptions {
41 index: "".to_string(),
42 limit: 0,
43 call_options: CallOptions::default(),
44 }
45 }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50 pub mode: QueryMode,
51 pub optimizer_options: Option<ExecuteQueryOptions>,
52 pub call_options: CallOptions,
53 pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91 fn default() -> Self {
92 QueryOptions {
93 mode: QueryMode::Normal,
94 optimizer_options: None,
95 call_options: CallOptions::default(),
96 enable_resume: true,
97 }
98 }
99}
100
101pub struct Transaction {
102 pub(crate) session: Option<ManagedSession>,
103 pub(crate) sequence_number: AtomicI64,
105 pub(crate) transaction_selector: TransactionSelector,
106}
107
108impl Transaction {
109 pub(crate) fn create_request_options(priority: Option<Priority>) -> Option<RequestOptions> {
110 priority.map(|s| RequestOptions {
111 priority: s.into(),
112 request_tag: "".to_string(),
113 transaction_tag: "".to_string(),
114 })
115 }
116
117 pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
122 self.query_with_option(statement, QueryOptions::default()).await
123 }
124
125 pub async fn query_with_option(
130 &mut self,
131 statement: Statement,
132 options: QueryOptions,
133 ) -> Result<RowIterator<'_, impl Reader>, Status> {
134 let request = ExecuteSqlRequest {
135 session: self.session.as_ref().unwrap().session.name.to_string(),
136 transaction: Some(self.transaction_selector.clone()),
137 sql: statement.sql,
138 params: Some(Struct {
139 fields: statement.params,
140 }),
141 param_types: statement.param_types,
142 resume_token: vec![],
143 query_mode: options.mode.into(),
144 partition_token: vec![],
145 seqno: 0,
146 query_options: options.optimizer_options,
147 request_options: Transaction::create_request_options(options.call_options.priority),
148 data_boost_enabled: false,
149 directed_read_options: None,
150 };
151 let session = self.session.as_mut().unwrap().deref_mut();
152 let reader = StatementReader {
153 enable_resume: options.enable_resume,
154 request,
155 };
156 RowIterator::new(session, reader, Some(options.call_options)).await
157 }
158
159 pub async fn read(
180 &mut self,
181 table: &str,
182 columns: &[&str],
183 key_set: impl Into<KeySet>,
184 ) -> Result<RowIterator<'_, impl Reader>, Status> {
185 self.read_with_option(table, columns, key_set, ReadOptions::default())
186 .await
187 }
188
189 pub async fn read_with_option(
191 &mut self,
192 table: &str,
193 columns: &[&str],
194 key_set: impl Into<KeySet>,
195 options: ReadOptions,
196 ) -> Result<RowIterator<'_, impl Reader>, Status> {
197 let request = ReadRequest {
198 session: self.get_session_name(),
199 transaction: Some(self.transaction_selector.clone()),
200 table: table.to_string(),
201 index: options.index,
202 columns: columns.iter().map(|x| x.to_string()).collect(),
203 key_set: Some(key_set.into().inner),
204 limit: options.limit,
205 resume_token: vec![],
206 partition_token: vec![],
207 request_options: Transaction::create_request_options(options.call_options.priority),
208 data_boost_enabled: false,
209 order_by: 0,
210 directed_read_options: None,
211 lock_hint: 0,
212 };
213
214 let session = self.as_mut_session();
215 let reader = TableReader { request };
216 RowIterator::new(session, reader, Some(options.call_options)).await
217 }
218
219 pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
232 self.read_row_with_option(table, columns, key, ReadOptions::default())
233 .await
234 }
235
236 pub async fn read_row_with_option(
238 &mut self,
239 table: &str,
240 columns: &[&str],
241 key: Key,
242 options: ReadOptions,
243 ) -> Result<Option<Row>, Status> {
244 let call_options = options.call_options.clone();
245 let mut reader = self
246 .read_with_option(table, columns, KeySet::from(key), options)
247 .await?;
248 reader.set_call_options(call_options);
249 reader.next().await
250 }
251
252 pub(crate) fn get_session_name(&self) -> String {
253 self.session.as_ref().unwrap().session.name.to_string()
254 }
255
256 pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
257 self.session.as_mut().unwrap()
258 }
259
260 pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
263 self.session.take()
264 }
265}