use rsfbclient_core::{FbError, FirebirdClient, FromRow, IntoParams, TrIsolationLevel, TrOp};
use std::marker;
use std::mem;
use super::{connection::Connection, statement::Statement};
use crate::{
connection::stmt_cache::StmtCache, connection::stmt_cache::StmtCacheData,
statement::StatementData, Execute, Queryable,
};
mod simple;
pub use simple::SimpleTransaction;
pub struct Transaction<'c, C>
where
C: FirebirdClient,
{
pub(crate) data: TransactionData<C>,
pub(crate) conn: &'c mut Connection<C>,
}
impl<'c, C: FirebirdClient> Transaction<'c, C> {
pub fn new(conn: &'c mut Connection<C>) -> Result<Self, FbError> {
let data = TransactionData::new(conn)?;
Ok(Transaction { data, conn })
}
pub fn commit(mut self) -> Result<(), FbError> {
let result = self.data.commit(self.conn);
if result.is_ok() {
mem::forget(self);
} else {
let _ = self.rollback();
}
result
}
pub fn commit_retaining(&mut self) -> Result<(), FbError> {
self.data.commit_retaining(self.conn)
}
pub fn rollback_retaining(&mut self) -> Result<(), FbError> {
self.data.rollback_retaining(self.conn)
}
pub fn rollback(mut self) -> Result<(), FbError> {
let result = self.data.rollback(self.conn);
mem::forget(self);
result
}
pub fn execute_immediate(&mut self, sql: &str) -> Result<(), FbError> {
self.data.execute_immediate(self.conn, sql)
}
pub fn prepare<'t>(
&'t mut self,
sql: &str,
named_params: bool,
) -> Result<Statement<'c, 't, C>, FbError> {
Statement::prepare(self, sql, named_params)
}
}
impl<'c, C: FirebirdClient> Drop for Transaction<'c, C> {
fn drop(&mut self) {
self.data.rollback(self.conn).ok();
}
}
pub struct StmtIter<'c, 'a, R, C>
where
C: FirebirdClient,
{
stmt_cache_data: Option<StmtCacheData<StatementData<C>>>,
tr: &'a mut Transaction<'c, C>,
_marker: marker::PhantomData<R>,
}
impl<R, C> Drop for StmtIter<'_, '_, R, C>
where
C: FirebirdClient,
{
fn drop(&mut self) {
self.stmt_cache_data
.as_mut()
.unwrap()
.stmt
.close_cursor(self.tr.conn)
.ok();
StmtCache::insert_and_close(self.tr.conn, self.stmt_cache_data.take().unwrap()).ok();
}
}
impl<R, C> Iterator for StmtIter<'_, '_, R, C>
where
R: FromRow,
C: FirebirdClient,
{
type Item = Result<R, FbError>;
fn next(&mut self) -> Option<Self::Item> {
self.stmt_cache_data
.as_mut()
.unwrap()
.stmt
.fetch(self.tr.conn, &mut self.tr.data)
.and_then(|row| row.map(FromRow::try_from).transpose())
.transpose()
}
}
impl<'c, C: FirebirdClient> Queryable for Transaction<'c, C> {
fn query_iter<'a, P, R>(
&'a mut self,
sql: &str,
params: P,
) -> Result<Box<dyn Iterator<Item = Result<R, FbError>> + 'a>, FbError>
where
P: IntoParams,
R: FromRow + 'static,
{
let params = params.to_params();
let mut stmt_cache_data = StmtCache::get_or_prepare(self, sql, params.named())?;
match stmt_cache_data
.stmt
.query(self.conn, &mut self.data, params)
{
Ok(_) => {
let iter = StmtIter {
stmt_cache_data: Some(stmt_cache_data),
tr: self,
_marker: Default::default(),
};
Ok(Box::new(iter))
}
Err(e) => {
StmtCache::insert_and_close(self.conn, stmt_cache_data)?;
Err(e)
}
}
}
}
impl<C: FirebirdClient> Execute for Transaction<'_, C> {
fn execute<P>(&mut self, sql: &str, params: P) -> Result<usize, FbError>
where
P: IntoParams,
{
let params = params.to_params();
let mut stmt_cache_data = StmtCache::get_or_prepare(self, sql, params.named())?;
let res = stmt_cache_data
.stmt
.execute(self.conn, &mut self.data, params);
StmtCache::insert_and_close(self.conn, stmt_cache_data)?;
res
}
fn execute_returnable<P, R>(&mut self, sql: &str, params: P) -> Result<R, FbError>
where
P: IntoParams,
R: FromRow + 'static,
{
let params = params.to_params();
let mut stmt_cache_data = StmtCache::get_or_prepare(self, sql, params.named())?;
let res = stmt_cache_data
.stmt
.execute2(self.conn, &mut self.data, params);
StmtCache::insert_and_close(self.conn, stmt_cache_data)?;
FromRow::try_from(res?)
}
}
#[derive(Debug)]
pub struct TransactionData<C: FirebirdClient> {
pub(crate) handle: C::TrHandle,
}
impl<C: FirebirdClient> TransactionData<C>
where
C::TrHandle: Send,
{
fn new(conn: &mut Connection<C>) -> Result<Self, FbError> {
let handle = conn
.cli
.begin_transaction(&mut conn.handle, TrIsolationLevel::ReadCommited)?;
Ok(Self { handle })
}
fn execute_immediate(&mut self, conn: &mut Connection<C>, sql: &str) -> Result<(), FbError> {
conn.cli
.exec_immediate(&mut conn.handle, &mut self.handle, conn.dialect, sql)
}
pub fn commit(&mut self, conn: &mut Connection<C>) -> Result<(), FbError> {
conn.cli
.transaction_operation(&mut self.handle, TrOp::Commit)
}
pub fn commit_retaining(&mut self, conn: &mut Connection<C>) -> Result<(), FbError> {
conn.cli
.transaction_operation(&mut self.handle, TrOp::CommitRetaining)
}
pub fn rollback_retaining(&mut self, conn: &mut Connection<C>) -> Result<(), FbError> {
conn.cli
.transaction_operation(&mut self.handle, TrOp::RollbackRetaining)
}
pub fn rollback(&mut self, conn: &mut Connection<C>) -> Result<(), FbError> {
conn.cli
.transaction_operation(&mut self.handle, TrOp::Rollback)
}
pub fn into_transaction(self, conn: &mut Connection<C>) -> Transaction<C> {
Transaction { data: self, conn }
}
pub fn from_transaction(tr: Transaction<C>) -> Self {
let tr = mem::ManuallyDrop::new(tr);
unsafe { std::ptr::read(&tr.data) }
}
}