use std::ops::DerefMut;
use std::sync::atomic::AtomicI64;
use prost_types::Struct;
use google_cloud_gax::grpc::Status;
use google_cloud_gax::retry::RetrySetting;
use google_cloud_googleapis::spanner::v1::request_options::Priority;
use google_cloud_googleapis::spanner::v1::{
execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
ReadRequest, RequestOptions, TransactionSelector,
};
use crate::key::{Key, KeySet};
use crate::reader::{AsyncIterator, RowIterator, StatementReader, TableReader};
use crate::row::Row;
use crate::session::ManagedSession;
use crate::statement::Statement;
#[derive(Clone, Default)]
pub struct CallOptions {
pub priority: Option<Priority>,
pub retry: Option<RetrySetting>,
}
#[derive(Clone)]
pub struct ReadOptions {
pub index: String,
pub limit: i64,
pub call_options: CallOptions,
}
impl Default for ReadOptions {
fn default() -> Self {
ReadOptions {
index: "".to_string(),
limit: 0,
call_options: CallOptions::default(),
}
}
}
#[derive(Clone)]
pub struct QueryOptions {
pub mode: QueryMode,
pub optimizer_options: Option<ExecuteQueryOptions>,
pub call_options: CallOptions,
}
impl Default for QueryOptions {
fn default() -> Self {
QueryOptions {
mode: QueryMode::Normal,
optimizer_options: None,
call_options: CallOptions::default(),
}
}
}
pub struct Transaction {
pub(crate) session: Option<ManagedSession>,
pub(crate) sequence_number: AtomicI64,
pub(crate) transaction_selector: TransactionSelector,
}
impl Transaction {
pub(crate) fn create_request_options(priority: Option<Priority>) -> Option<RequestOptions> {
priority.map(|s| RequestOptions {
priority: s.into(),
request_tag: "".to_string(),
transaction_tag: "".to_string(),
})
}
pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_>, Status> {
self.query_with_option(statement, QueryOptions::default()).await
}
pub async fn query_with_option(
&mut self,
statement: Statement,
options: QueryOptions,
) -> Result<RowIterator<'_>, Status> {
let request = ExecuteSqlRequest {
session: self.session.as_ref().unwrap().session.name.to_string(),
transaction: Some(self.transaction_selector.clone()),
sql: statement.sql,
params: Some(Struct {
fields: statement.params,
}),
param_types: statement.param_types,
resume_token: vec![],
query_mode: options.mode.into(),
partition_token: vec![],
seqno: 0,
query_options: options.optimizer_options,
request_options: Transaction::create_request_options(options.call_options.priority),
};
let session = self.session.as_mut().unwrap().deref_mut();
let reader = Box::new(StatementReader { request });
RowIterator::new(session, reader, Some(options.call_options)).await
}
pub async fn read(
&mut self,
table: &str,
columns: &[&str],
key_set: impl Into<KeySet>,
) -> Result<RowIterator<'_>, Status> {
self.read_with_option(table, columns, key_set, ReadOptions::default())
.await
}
pub async fn read_with_option(
&mut self,
table: &str,
columns: &[&str],
key_set: impl Into<KeySet>,
options: ReadOptions,
) -> Result<RowIterator<'_>, Status> {
let request = ReadRequest {
session: self.get_session_name(),
transaction: Some(self.transaction_selector.clone()),
table: table.to_string(),
index: options.index,
columns: columns.iter().map(|x| x.to_string()).collect(),
key_set: Some(key_set.into().inner),
limit: options.limit,
resume_token: vec![],
partition_token: vec![],
request_options: Transaction::create_request_options(options.call_options.priority),
};
let session = self.as_mut_session();
let reader = Box::new(TableReader { request });
RowIterator::new(session, reader, Some(options.call_options)).await
}
pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
self.read_row_with_option(table, columns, key, ReadOptions::default())
.await
}
pub async fn read_row_with_option(
&mut self,
table: &str,
columns: &[&str],
key: Key,
options: ReadOptions,
) -> Result<Option<Row>, Status> {
let call_options = options.call_options.clone();
let mut reader = self
.read_with_option(table, columns, KeySet::from(key), options)
.await?;
reader.set_call_options(call_options);
reader.next().await
}
pub(crate) fn get_session_name(&self) -> String {
return self.session.as_ref().unwrap().session.name.to_string();
}
pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
return self.session.as_mut().unwrap();
}
pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
self.session.take()
}
}