use std::future::Future;
use std::sync::Arc;
use crate::error::{ConnectorError as Error, Result};
use crate::transaction::TransactionOptions;
use nautilus_dialect::Dialect;
use crate::Executor;
pub struct Client<E>
where
E: Executor,
{
dialect: Arc<dyn Dialect + Send + Sync>,
executor: Arc<E>,
}
impl<E> Client<E>
where
E: Executor,
{
pub fn new<D>(dialect: D, executor: E) -> Self
where
D: Dialect + Send + Sync + 'static,
{
Self {
dialect: Arc::new(dialect),
executor: Arc::new(executor),
}
}
pub fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
&*self.dialect
}
pub fn executor(&self) -> &E {
&self.executor
}
}
impl Client<crate::postgres::PgExecutor> {
pub async fn postgres(url: &str) -> Result<Self> {
use crate::postgres::PgExecutor;
use nautilus_dialect::PostgresDialect;
let executor = PgExecutor::new(url).await?;
let dialect = PostgresDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::postgres(sqlx_tx);
if let Some(iso) = opts.isolation_level {
let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
let sql = nautilus_dialect::Sql {
text: iso_sql,
params: vec![],
};
use crate::executor::execute_all;
let _ = execute_all(&tx_executor, &sql).await?;
}
let tx_client = Client::new(nautilus_dialect::PostgresDialect, tx_executor);
let result = if opts.timeout.as_millis() > 0 {
match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
Ok(r) => r,
Err(_) => {
let _ = tx_client.executor().rollback().await;
return Err(Error::database_msg("Transaction timed out"));
}
}
} else {
f(tx_client.clone()).await
};
match &result {
Ok(_) => tx_client.executor().commit().await?,
Err(_) => {
let _ = tx_client.executor().rollback().await;
}
}
result
}
}
impl Client<crate::mysql::MysqlExecutor> {
pub async fn mysql(url: &str) -> Result<Self> {
use crate::mysql::MysqlExecutor;
use nautilus_dialect::MysqlDialect;
let executor = MysqlExecutor::new(url).await?;
let dialect = MysqlDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::mysql(sqlx_tx);
if let Some(iso) = opts.isolation_level {
let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
let sql = nautilus_dialect::Sql {
text: iso_sql,
params: vec![],
};
use crate::executor::execute_all;
let _ = execute_all(&tx_executor, &sql).await?;
}
let tx_client = Client::new(nautilus_dialect::MysqlDialect, tx_executor);
let result = if opts.timeout.as_millis() > 0 {
match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
Ok(r) => r,
Err(_) => {
let _ = tx_client.executor().rollback().await;
return Err(Error::database_msg("Transaction timed out"));
}
}
} else {
f(tx_client.clone()).await
};
match &result {
Ok(_) => tx_client.executor().commit().await?,
Err(_) => {
let _ = tx_client.executor().rollback().await;
}
}
result
}
}
impl Client<crate::sqlite::SqliteExecutor> {
pub async fn sqlite(url: &str) -> Result<Self> {
use crate::sqlite::SqliteExecutor;
use nautilus_dialect::SqliteDialect;
let executor = SqliteExecutor::new(url).await?;
let dialect = SqliteDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::sqlite(sqlx_tx);
if let Some(iso) = opts.isolation_level {
let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
let sql = nautilus_dialect::Sql {
text: iso_sql,
params: vec![],
};
use crate::executor::execute_all;
let _ = execute_all(&tx_executor, &sql).await?;
}
let tx_client = Client::new(nautilus_dialect::SqliteDialect, tx_executor);
let result = if opts.timeout.as_millis() > 0 {
match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
Ok(r) => r,
Err(_) => {
let _ = tx_client.executor().rollback().await;
return Err(Error::database_msg("Transaction timed out"));
}
}
} else {
f(tx_client.clone()).await
};
match &result {
Ok(_) => tx_client.executor().commit().await?,
Err(_) => {
let _ = tx_client.executor().rollback().await;
}
}
result
}
}
impl<E> Clone for Client<E>
where
E: Executor,
{
fn clone(&self) -> Self {
Self {
dialect: Arc::clone(&self.dialect),
executor: Arc::clone(&self.executor),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Client<crate::postgres::PgExecutor>>();
assert_send_sync::<Client<crate::sqlite::SqliteExecutor>>();
assert_send_sync::<Client<crate::mysql::MysqlExecutor>>();
}
}