use rsfbclient_core::{Dialect, FbError, FirebirdClient, FirebirdClientDbOps, FromRow, IntoParams};
use std::{marker, mem};
use crate::{
query::Queryable, statement::StatementData, transaction::TransactionData, Execute, Transaction,
};
use stmt_cache::{StmtCache, StmtCacheData};
pub mod builders {
#![allow(unused_imports)]
use super::{
super::{charset, Charset},
Connection, ConnectionConfiguration, Dialect, FbError, FirebirdClient,
FirebirdClientFactory,
};
#[cfg(feature = "native_client")]
mod builder_native;
#[cfg(feature = "native_client")]
pub use builder_native::*;
#[cfg(feature = "pure_rust")]
mod builder_pure_rust;
#[cfg(feature = "pure_rust")]
pub use builder_pure_rust::*;
}
pub(crate) mod conn_string;
pub(crate) mod stmt_cache;
pub(crate) mod simple;
pub use simple::SimpleConnection;
pub trait FirebirdClientFactory {
type C: FirebirdClient;
fn new_instance(&self) -> Result<Self::C, FbError>;
fn get_conn_conf(
&self,
) -> &ConnectionConfiguration<<Self::C as FirebirdClientDbOps>::AttachmentConfig>;
}
#[derive(Clone)]
pub struct ConnectionConfiguration<A> {
attachment_conf: A,
dialect: Dialect,
stmt_cache_size: usize,
}
impl<A: Default> Default for ConnectionConfiguration<A> {
fn default() -> Self {
Self {
attachment_conf: Default::default(),
dialect: Dialect::D3,
stmt_cache_size: 20,
}
}
}
pub struct Connection<C: FirebirdClient> {
pub(crate) handle: <C as FirebirdClientDbOps>::DbHandle,
pub(crate) dialect: Dialect,
pub(crate) stmt_cache: StmtCache<StatementData<C>>,
pub(crate) def_tr: Option<TransactionData<C>>,
pub(crate) in_transaction: bool,
pub(crate) cli: C,
}
impl<C: FirebirdClient> Connection<C> {
pub fn open(
mut cli: C,
conf: &ConnectionConfiguration<C::AttachmentConfig>,
) -> Result<Connection<C>, FbError> {
let handle = cli.attach_database(&conf.attachment_conf)?;
let stmt_cache = StmtCache::new(conf.stmt_cache_size);
Ok(Connection {
handle,
dialect: conf.dialect,
stmt_cache,
def_tr: None,
in_transaction: false,
cli,
})
}
pub fn drop_database(mut self) -> Result<(), FbError> {
self.cli.drop_database(&mut self.handle)?;
Ok(())
}
pub fn close(mut self) -> Result<(), FbError> {
let res = self.cleanup_and_detach();
mem::forget(self);
res
}
fn cleanup_and_detach(&mut self) -> Result<(), FbError> {
StmtCache::close_all(self);
if let Some(mut tr) = self.def_tr.take() {
tr.rollback(self).ok();
}
self.cli.detach_database(&mut self.handle)?;
Ok(())
}
pub fn with_transaction<T, F>(&mut self, closure: F) -> Result<T, FbError>
where
F: FnOnce(&mut Transaction<C>) -> Result<T, FbError>,
{
let in_transaction = self.in_transaction;
let mut tr = if let Some(tr) = self.def_tr.take() {
tr.into_transaction(self)
} else {
Transaction::new(self)?
};
let res = closure(&mut tr);
if !in_transaction {
if res.is_ok() {
tr.commit_retaining()?;
} else {
tr.rollback_retaining()?;
}
}
let tr = TransactionData::from_transaction(tr);
if let Some(mut tr) = self.def_tr.replace(tr) {
tr.rollback(self).ok();
}
res
}
fn use_transaction<T, F>(&mut self, closure: F) -> Result<T, FbError>
where
F: FnOnce(&mut Transaction<C>) -> Result<T, FbError>,
{
let mut tr = if let Some(tr) = self.def_tr.take() {
tr.into_transaction(self)
} else {
Transaction::new(self)?
};
let res = closure(&mut tr);
let tr = TransactionData::from_transaction(tr);
if let Some(mut tr) = self.def_tr.replace(tr) {
tr.rollback(self).ok();
}
res
}
pub fn begin_transaction(&mut self) -> Result<(), FbError> {
let tr = if let Some(tr) = self.def_tr.take() {
tr.into_transaction(self)
} else {
Transaction::new(self)?
};
let tr = TransactionData::from_transaction(tr);
if let Some(mut tr) = self.def_tr.replace(tr) {
tr.rollback(self).ok();
}
self.in_transaction = true;
Ok(())
}
pub fn commit(&mut self) -> Result<(), FbError> {
self.in_transaction = false;
self.use_transaction(|tr| tr.commit_retaining())
}
pub fn rollback(&mut self) -> Result<(), FbError> {
self.in_transaction = false;
self.use_transaction(|tr| tr.rollback_retaining())
}
}
impl<C: FirebirdClient> Drop for Connection<C> {
fn drop(&mut self) {
let _ = self.cleanup_and_detach();
}
}
pub struct StmtIter<'a, R, C: FirebirdClient> {
stmt_cache_data: Option<StmtCacheData<StatementData<C>>>,
conn: &'a mut Connection<C>,
_marker: marker::PhantomData<R>,
}
impl<R, C> Drop for StmtIter<'_, R, C>
where
C: FirebirdClient,
{
fn drop(&mut self) {
self.stmt_cache_data
.as_mut()
.unwrap()
.stmt
.close_cursor(self.conn)
.ok();
StmtCache::insert_and_close(self.conn, self.stmt_cache_data.take().unwrap()).ok();
if !self.conn.in_transaction {
self.conn.commit().ok();
}
}
}
impl<R, C> Iterator for StmtIter<'_, R, C>
where
R: FromRow,
C: FirebirdClient,
{
type Item = Result<R, FbError>;
fn next(&mut self) -> Option<Self::Item> {
let stmt_cache_data = self.stmt_cache_data.as_mut().unwrap();
self.conn
.use_transaction(move |tr| {
Ok(stmt_cache_data
.stmt
.fetch(tr.conn, &mut tr.data)
.and_then(|row| row.map(FromRow::try_from).transpose())
.transpose())
})
.unwrap_or_default()
}
}
impl<C> Queryable for Connection<C>
where
C: FirebirdClient,
{
fn query_iter<'a, P, R>(
&'a mut self,
sql: &str,
params: P,
) -> Result<Box<dyn Iterator<Item = Result<R, FbError>> + 'a>, FbError>
where
P: IntoParams,
R: FromRow + 'static,
{
let stmt_cache_data = self.use_transaction(|tr| {
let params = params.to_params();
let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
match stmt_cache_data.stmt.query(tr.conn, &mut tr.data, params) {
Ok(_) => Ok(stmt_cache_data),
Err(e) => {
StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
if !tr.conn.in_transaction {
tr.rollback_retaining().ok();
}
Err(e)
}
}
})?;
let iter = StmtIter {
stmt_cache_data: Some(stmt_cache_data),
conn: self,
_marker: Default::default(),
};
Ok(Box::new(iter))
}
}
impl<C> Execute for Connection<C>
where
C: FirebirdClient,
{
fn execute<P>(&mut self, sql: &str, params: P) -> Result<usize, FbError>
where
P: IntoParams,
{
let params = params.to_params();
self.with_transaction(|tr| {
let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
let res = stmt_cache_data.stmt.execute(tr.conn, &mut tr.data, params);
StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
res
})
}
fn execute_returnable<P, R>(&mut self, sql: &str, params: P) -> Result<R, FbError>
where
P: IntoParams,
R: FromRow + 'static,
{
let params = params.to_params();
self.with_transaction(|tr| {
let mut stmt_cache_data = StmtCache::get_or_prepare(tr, sql, params.named())?;
let res = stmt_cache_data.stmt.execute2(tr.conn, &mut tr.data, params);
StmtCache::insert_and_close(tr.conn, stmt_cache_data)?;
let f_res = FromRow::try_from(res?)?;
Ok(f_res)
})
}
}
#[cfg(test)]
mk_tests_default! {
use crate::*;
#[test]
fn remote_connection() -> Result<(), FbError> {
let conn = cbuilder().connect()?;
conn.close().expect("error closing the connection");
Ok(())
}
#[test]
fn query_iter() -> Result<(), FbError> {
let mut conn = cbuilder().connect()?;
let mut rows = 0;
for row in conn
.query_iter("SELECT -3 FROM RDB$DATABASE WHERE 1 = ?", (1,))?
{
let (v,): (i32,) = row?;
assert_eq!(v, -3);
rows += 1;
}
assert_eq!(rows, 1);
Ok(())
}
}