use std::future::Future;
use std::sync::Arc;
use crate::error::{ConnectorError as Error, Result};
use crate::transaction::TransactionOptions;
use nautilus_dialect::Dialect;
use crate::Executor;
pub struct Client<E>
where
E: Executor,
{
dialect: Arc<dyn Dialect + Send + Sync>,
executor: Arc<E>,
}
impl<E> Client<E>
where
E: Executor,
{
pub fn new<D>(dialect: D, executor: E) -> Self
where
D: Dialect + Send + Sync + 'static,
{
Self {
dialect: Arc::new(dialect),
executor: Arc::new(executor),
}
}
pub fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
&*self.dialect
}
pub fn executor(&self) -> &E {
&self.executor
}
}
async fn set_transaction_isolation(
tx_executor: &crate::transaction::TransactionExecutor,
isolation_level: Option<crate::IsolationLevel>,
) -> Result<()> {
let Some(isolation_level) = isolation_level else {
return Ok(());
};
let sql = nautilus_dialect::Sql {
text: format!(
"SET TRANSACTION ISOLATION LEVEL {}",
isolation_level.as_sql()
),
params: vec![],
};
crate::execute_all(tx_executor, &sql).await?;
Ok(())
}
async fn drive_transaction<F, Fut, T, D>(
tx_executor: crate::transaction::TransactionExecutor,
dialect: D,
opts: TransactionOptions,
supports_isolation_level: bool,
f: F,
) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
D: Dialect + Send + Sync + 'static,
{
let TransactionOptions {
timeout,
isolation_level,
} = opts;
if supports_isolation_level {
set_transaction_isolation(&tx_executor, isolation_level).await?;
}
let tx_client = Client::new(dialect, tx_executor);
let result = if timeout.is_zero() {
f(tx_client.clone()).await
} else {
match tokio::time::timeout(timeout, f(tx_client.clone())).await {
Ok(result) => result,
Err(_) => {
let _ = tx_client.executor().rollback().await;
return Err(Error::database_msg("Transaction timed out"));
}
}
};
match &result {
Ok(_) => tx_client.executor().commit().await?,
Err(_) => {
let _ = tx_client.executor().rollback().await;
}
}
result
}
impl Client<crate::postgres::PgExecutor> {
pub async fn postgres(url: &str) -> Result<Self> {
use crate::postgres::PgExecutor;
use nautilus_dialect::PostgresDialect;
let executor = PgExecutor::new(url).await?;
let dialect = PostgresDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::postgres(sqlx_tx);
drive_transaction(
tx_executor,
nautilus_dialect::PostgresDialect,
opts,
true,
f,
)
.await
}
}
impl Client<crate::mysql::MysqlExecutor> {
pub async fn mysql(url: &str) -> Result<Self> {
use crate::mysql::MysqlExecutor;
use nautilus_dialect::MysqlDialect;
let executor = MysqlExecutor::new(url).await?;
let dialect = MysqlDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::mysql(sqlx_tx);
drive_transaction(tx_executor, nautilus_dialect::MysqlDialect, opts, true, f).await
}
}
impl Client<crate::sqlite::SqliteExecutor> {
pub async fn sqlite(url: &str) -> Result<Self> {
use crate::sqlite::SqliteExecutor;
use nautilus_dialect::SqliteDialect;
let executor = SqliteExecutor::new(url).await?;
let dialect = SqliteDialect;
Ok(Self::new(dialect, executor))
}
pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
where
F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
Fut: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let sqlx_tx = self
.executor()
.pool()
.begin()
.await
.map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
let tx_executor = crate::transaction::TransactionExecutor::sqlite(sqlx_tx);
drive_transaction(tx_executor, nautilus_dialect::SqliteDialect, opts, false, f).await
}
}
impl<E> Clone for Client<E>
where
E: Executor,
{
fn clone(&self) -> Self {
Self {
dialect: Arc::clone(&self.dialect),
executor: Arc::clone(&self.executor),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Client<crate::postgres::PgExecutor>>();
assert_send_sync::<Client<crate::sqlite::SqliteExecutor>>();
assert_send_sync::<Client<crate::mysql::MysqlExecutor>>();
}
#[tokio::test]
async fn sqlite_transaction_ignores_isolation_level() {
let client = Client::sqlite("sqlite::memory:")
.await
.expect("sqlite client should be created");
let result = client
.transaction(
TransactionOptions {
timeout: std::time::Duration::from_secs(1),
isolation_level: Some(crate::IsolationLevel::Serializable),
},
|tx| {
Box::pin(async move {
let sql = nautilus_dialect::Sql {
text: "SELECT 1 AS one".to_string(),
params: vec![],
};
let rows = crate::execute_all(tx.executor(), &sql).await?;
assert_eq!(rows.len(), 1);
Ok(())
})
},
)
.await;
assert!(
result.is_ok(),
"sqlite transaction should not fail when an isolation level is requested: {result:?}"
);
}
}