use crate::{
debug_print, error::*, DatabaseConnection, DbBackend, ExecResult, ProxyDatabaseTrait,
QueryResult, Statement,
};
use futures::Stream;
use std::{
fmt::Debug,
pin::Pin,
sync::{Arc, Mutex},
};
use tracing::instrument;
#[derive(Debug)]
pub struct ProxyDatabaseConnector;
#[derive(Debug)]
pub struct ProxyDatabaseConnection {
db_backend: DbBackend,
proxy: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>,
}
impl ProxyDatabaseConnector {
#[allow(unused_variables)]
pub fn accepts(string: &str) -> bool {
true
}
#[allow(unused_variables)]
#[instrument(level = "trace")]
pub fn connect(
db_type: DbBackend,
func: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>,
) -> Result<DatabaseConnection, DbErr> {
Ok(DatabaseConnection::ProxyDatabaseConnection(Arc::new(
ProxyDatabaseConnection::new(db_type, func),
)))
}
}
impl ProxyDatabaseConnection {
pub fn new(db_backend: DbBackend, funcs: Arc<Mutex<Box<dyn ProxyDatabaseTrait>>>) -> Self {
Self {
db_backend,
proxy: funcs.to_owned(),
}
}
pub fn get_database_backend(&self) -> DbBackend {
self.db_backend
}
#[instrument(level = "trace")]
pub fn execute(&self, statement: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", statement);
Ok(self
.proxy
.lock()
.map_err(exec_err)?
.execute(statement)?
.into())
}
#[instrument(level = "trace")]
pub fn query_one(&self, statement: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", statement);
let result = self.proxy.lock().map_err(query_err)?.query(statement)?;
if let Some(first) = result.first() {
return Ok(Some(QueryResult {
row: crate::QueryResultRow::Proxy(first.to_owned()),
}));
} else {
return Ok(None);
}
}
#[instrument(level = "trace")]
pub fn query_all(&self, statement: Statement) -> Result<Vec<QueryResult>, DbErr> {
debug_print!("{}", statement);
let result = self.proxy.lock().map_err(query_err)?.query(statement)?;
Ok(result
.into_iter()
.map(|row| QueryResult {
row: crate::QueryResultRow::Proxy(row),
})
.collect())
}
#[instrument(level = "trace")]
pub fn fetch(
&self,
statement: &Statement,
) -> Pin<Box<dyn Stream<Item = Result<QueryResult, DbErr>> + Send>> {
match self.query_all(statement.clone()) {
Ok(v) => Box::pin(futures::stream::iter(v.into_iter().map(Ok))),
Err(e) => Box::pin(futures::stream::iter(Some(Err(e)).into_iter())),
}
}
#[instrument(level = "trace")]
pub fn begin(&self) {
self.proxy.lock().expect("Failed to acquire mocker").begin()
}
#[instrument(level = "trace")]
pub fn commit(&self) {
self.proxy
.lock()
.expect("Failed to acquire mocker")
.commit()
}
#[instrument(level = "trace")]
pub fn rollback(&self) {
self.proxy
.lock()
.expect("Failed to acquire mocker")
.rollback()
}
pub fn ping(&self) -> Result<(), DbErr> {
self.proxy.lock().map_err(query_err)?.ping()
}
}