use crate::{Cursor, DocumentCollection, QldbError, QldbResult, Transaction};
use ion_binary_rs::{IonEncoder, IonParser, IonValue};
use rusoto_qldb_session::{
ExecuteStatementRequest, FetchPageRequest, QldbSession, QldbSessionClient, SendCommandRequest, ValueHolder,
};
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc;
pub struct QueryBuilder {
tx: Transaction,
client: Arc<QldbSessionClient>,
statement: Arc<String>,
params: Vec<IonValue>,
auto_rollback: bool,
is_executed: Arc<AtomicBool>,
}
impl QueryBuilder {
pub(crate) fn new(
client: Arc<QldbSessionClient>,
tx: Transaction,
statement: &str,
auto_rollback: bool,
) -> QueryBuilder {
QueryBuilder {
client,
tx,
statement: Arc::new(statement.to_string()),
params: vec![],
auto_rollback,
is_executed: Arc::new(AtomicBool::from(false)),
}
}
pub fn param<P: Into<IonValue> + Clone>(mut self, param: P) -> Self {
self.params.push(param.into());
self
}
pub async fn execute(self) -> QldbResult<DocumentCollection> {
let auto_rollback = self.auto_rollback;
let tx = self.tx.clone();
let result = self.get_cursor()?.load_all().await?;
if auto_rollback {
tx.rollback().await?;
}
Ok(result)
}
pub(crate) async fn execute_get_page(&mut self, page_token: &str) -> QldbResult<(Vec<IonValue>, Option<String>)> {
let result = self
.client
.send_command(create_next_page_command(
self.tx.session.get_session_id(),
&self.tx.transaction_id,
page_token,
))
.await?;
let (values, next_page_token) = result
.fetch_page
.and_then(|page| page.page)
.map(|page| {
let values = page.values.unwrap_or_default();
(values, page.next_page_token)
})
.unwrap_or((vec![], None));
let values = valueholders_to_ionvalues(values)?;
Ok((values, next_page_token))
}
pub(crate) async fn execute_statement(&mut self) -> QldbResult<(Vec<IonValue>, Option<String>)> {
if self.tx.is_completed().await {
return Err(QldbError::TransactionCompleted);
}
if self.is_executed.load(Relaxed) {
return Err(QldbError::QueryAlreadyExecuted);
}
self.tx.hash_query(&self.statement, &self.params).await;
let params = std::mem::take(&mut self.params);
self.is_executed.store(true, Relaxed);
let result = self
.client
.send_command(create_send_command(
self.tx.session.get_session_id(),
&self.tx.transaction_id,
&self.statement,
params,
))
.await?;
let (values, next_page_token) = result
.execute_statement
.and_then(|result| result.first_page)
.map(|result| {
let values = result.values.unwrap_or_default();
(values, result.next_page_token)
})
.unwrap_or((vec![], None));
let values = valueholders_to_ionvalues(values)?;
Ok((values, next_page_token))
}
pub fn get_cursor(self) -> QldbResult<Cursor> {
if self.is_executed.load(Relaxed) {
return Err(QldbError::QueryAlreadyExecuted);
}
Ok(Cursor::new(self))
}
pub async fn count(self) -> QldbResult<i64> {
let result = self.execute().await?;
match result.into_inner().last() {
Some(doc) => match doc.get("_1") {
Some(IonValue::Integer(count)) => Ok(*count),
_ => Err(QldbError::NonValidCountStatementResult),
},
_ => Err(QldbError::NonValidCountStatementResult),
}
}
}
impl Debug for QueryBuilder {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("Transaction")
.field("tx", &self.tx)
.field("statement", &self.statement)
.field("params", &self.params)
.field("auto_rollback", &self.auto_rollback)
.finish()
}
}
fn valueholders_to_ionvalues(values: Vec<ValueHolder>) -> QldbResult<Vec<IonValue>> {
let mut decoded_values = vec![];
for value in values {
let bytes = if let Some(bytes) = value.ion_binary {
bytes
} else {
continue;
};
let parsed_values = IonParser::new(&bytes[..])
.consume_all()
.map_err(QldbError::IonParserError)?;
for value in parsed_values {
decoded_values.push(value);
}
}
Ok(decoded_values)
}
fn create_send_command(
session: &str,
transaction_id: &str,
statement: &str,
params: Vec<IonValue>,
) -> SendCommandRequest {
SendCommandRequest {
session_token: Some(session.to_string()),
execute_statement: Some(ExecuteStatementRequest {
statement: statement.to_string(),
parameters: Some(params.into_iter().map(ionvalue_to_valueholder).collect()),
transaction_id: transaction_id.to_string(),
}),
..Default::default()
}
}
fn create_next_page_command(session: &str, transaction_id: &str, next_page_token: &str) -> SendCommandRequest {
SendCommandRequest {
session_token: Some(session.to_string()),
fetch_page: Some(FetchPageRequest {
transaction_id: transaction_id.to_string(),
next_page_token: next_page_token.to_string(),
}),
..Default::default()
}
}
fn ionvalue_to_valueholder(value: IonValue) -> ValueHolder {
let mut encoder = IonEncoder::new();
encoder.add(value);
let bytes = encoder.encode();
ValueHolder {
ion_text: None,
ion_binary: Some(bytes.into()),
}
}