nautilus-orm-connector 0.1.4

Database executors and connection management for Nautilus ORM
Documentation
//! Client combining a SQL dialect with a database executor.

use std::future::Future;
use std::sync::Arc;

use crate::error::{ConnectorError as Error, Result};
use crate::transaction::TransactionOptions;
use nautilus_dialect::Dialect;

use crate::Executor;

/// A Client holding both a Dialect (for SQL rendering) and an Executor (for query execution).
///
/// The Client uses `Arc` internally, making it cheap to clone and thread-safe.
/// This allows the same client to be shared across multiple parts of your application.
///
/// The Client is generic over the Executor type to work around limitations with
/// trait objects and Generic Associated Types (GATs).
///
/// # Example
///
/// ```no_run
/// # use nautilus_connector::{Client, ConnectorResult};
/// # async fn example() -> ConnectorResult<()> {
/// let client = Client::postgres("postgres://user:pass@localhost/db").await?;
/// // client can now be cloned and passed around cheaply
/// let clone = client.clone();
/// # Ok(())
/// # }
/// ```
pub struct Client<E>
where
    E: Executor,
{
    dialect: Arc<dyn Dialect + Send + Sync>,
    executor: Arc<E>,
}

impl<E> Client<E>
where
    E: Executor,
{
    /// Creates a new Client from a dialect and an executor.
    ///
    /// This is the generic constructor that works with any Dialect and Executor implementation.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use nautilus_connector::{Client, PgExecutor, ConnectorResult};
    /// # use nautilus_dialect::PostgresDialect;
    /// # async fn example() -> ConnectorResult<()> {
    /// let executor = PgExecutor::new("postgres://localhost/mydb").await?;
    /// let dialect = PostgresDialect;
    /// let client = Client::new(dialect, executor);
    /// # Ok(())
    /// # }
    /// ```
    pub fn new<D>(dialect: D, executor: E) -> Self
    where
        D: Dialect + Send + Sync + 'static,
    {
        Self {
            dialect: Arc::new(dialect),
            executor: Arc::new(executor),
        }
    }

    /// Returns a reference to the underlying Dialect.
    ///
    /// Use this to render queries into SQL.
    pub fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
        &*self.dialect
    }

    /// Returns a reference to the underlying Executor.
    ///
    /// Use this to execute rendered SQL queries against the database.
    pub fn executor(&self) -> &E {
        &self.executor
    }
}

/// Convenience constructors for specific database backends.
impl Client<crate::postgres::PgExecutor> {
    /// Creates a new PostgreSQL client.
    ///
    /// This is a convenience constructor that creates both a PostgresDialect
    /// and a PgExecutor, then wraps them in a Client.
    ///
    /// # Arguments
    ///
    /// * `url` - PostgreSQL connection string (e.g., "postgres://user:pass@localhost/db")
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use nautilus_connector::{Client, ConnectorResult};
    /// # async fn example() -> ConnectorResult<()> {
    /// let client = Client::postgres("postgres://localhost/mydb").await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn postgres(url: &str) -> Result<Self> {
        use crate::postgres::PgExecutor;
        use nautilus_dialect::PostgresDialect;

        let executor = PgExecutor::new(url).await?;
        let dialect = PostgresDialect;
        Ok(Self::new(dialect, executor))
    }

    /// Execute an async closure inside a database transaction.
    ///
    /// The closure receives a `Client<TransactionExecutor>` whose queries all
    /// run on the same underlying connection.  If the closure returns `Ok`,
    /// the transaction is committed; if it returns `Err` (or panics), the
    /// transaction is rolled back.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use nautilus_connector::{Client, ConnectorResult};
    /// # async fn example() -> ConnectorResult<()> {
    /// let client = Client::postgres("postgres://localhost/mydb").await?;
    /// let result = client.transaction(Default::default(), |tx| Box::pin(async move {
    ///     // tx.executor() runs queries inside the transaction
    ///     Ok(42)
    /// })).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
    where
        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
        Fut: Future<Output = Result<T>> + Send,
        T: Send + 'static,
    {
        let sqlx_tx = self
            .executor()
            .pool()
            .begin()
            .await
            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
        let tx_executor = crate::transaction::TransactionExecutor::postgres(sqlx_tx);

        // Set isolation level if requested
        if let Some(iso) = opts.isolation_level {
            let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
            let sql = nautilus_dialect::Sql {
                text: iso_sql,
                params: vec![],
            };
            use crate::executor::execute_all;
            let _ = execute_all(&tx_executor, &sql).await?;
        }

        let tx_client = Client::new(nautilus_dialect::PostgresDialect, tx_executor);

        let result = if opts.timeout.as_millis() > 0 {
            match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
                Ok(r) => r,
                Err(_) => {
                    let _ = tx_client.executor().rollback().await;
                    return Err(Error::database_msg("Transaction timed out"));
                }
            }
        } else {
            f(tx_client.clone()).await
        };

        match &result {
            Ok(_) => tx_client.executor().commit().await?,
            Err(_) => {
                let _ = tx_client.executor().rollback().await;
            }
        }

        result
    }
}

/// Convenience constructor for MySQL.
impl Client<crate::mysql::MysqlExecutor> {
    /// Creates a new MySQL client.
    ///
    /// This is a convenience constructor that creates both a MysqlDialect
    /// and a MysqlExecutor, then wraps them in a Client.
    ///
    /// # Arguments
    ///
    /// * `url` - MySQL connection string (e.g., "mysql://user:pass@localhost/db")
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use nautilus_connector::{Client, ConnectorResult};
    /// # async fn example() -> ConnectorResult<()> {
    /// let client = Client::mysql("mysql://user:pass@localhost/mydb").await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn mysql(url: &str) -> Result<Self> {
        use crate::mysql::MysqlExecutor;
        use nautilus_dialect::MysqlDialect;

        let executor = MysqlExecutor::new(url).await?;
        let dialect = MysqlDialect;
        Ok(Self::new(dialect, executor))
    }

    /// Execute an async closure inside a MySQL transaction.
    ///
    /// See [`Client::<PgExecutor>::transaction`] for full documentation.
    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
    where
        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
        Fut: Future<Output = Result<T>> + Send,
        T: Send + 'static,
    {
        let sqlx_tx = self
            .executor()
            .pool()
            .begin()
            .await
            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
        let tx_executor = crate::transaction::TransactionExecutor::mysql(sqlx_tx);

        if let Some(iso) = opts.isolation_level {
            let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
            let sql = nautilus_dialect::Sql {
                text: iso_sql,
                params: vec![],
            };
            use crate::executor::execute_all;
            let _ = execute_all(&tx_executor, &sql).await?;
        }

        let tx_client = Client::new(nautilus_dialect::MysqlDialect, tx_executor);

        let result = if opts.timeout.as_millis() > 0 {
            match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
                Ok(r) => r,
                Err(_) => {
                    let _ = tx_client.executor().rollback().await;
                    return Err(Error::database_msg("Transaction timed out"));
                }
            }
        } else {
            f(tx_client.clone()).await
        };

        match &result {
            Ok(_) => tx_client.executor().commit().await?,
            Err(_) => {
                let _ = tx_client.executor().rollback().await;
            }
        }

        result
    }
}

/// Convenience constructor for SQLite.
impl Client<crate::sqlite::SqliteExecutor> {
    /// Creates a new SQLite client.
    ///
    /// This is a convenience constructor that creates both a SqliteDialect
    /// and a SqliteExecutor, then wraps them in a Client.
    ///
    /// # Arguments
    ///
    /// * `url` - SQLite connection URL (e.g., `sqlite:mydb.db` or `sqlite::memory:`)
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use nautilus_connector::{Client, ConnectorResult};
    /// # async fn example() -> ConnectorResult<()> {
    /// let client = Client::sqlite("sqlite::memory:").await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn sqlite(url: &str) -> Result<Self> {
        use crate::sqlite::SqliteExecutor;
        use nautilus_dialect::SqliteDialect;

        let executor = SqliteExecutor::new(url).await?;
        let dialect = SqliteDialect;
        Ok(Self::new(dialect, executor))
    }

    /// Execute an async closure inside a SQLite transaction.
    ///
    /// See [`Client::<PgExecutor>::transaction`] for full documentation.
    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
    where
        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
        Fut: Future<Output = Result<T>> + Send,
        T: Send + 'static,
    {
        let sqlx_tx = self
            .executor()
            .pool()
            .begin()
            .await
            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
        let tx_executor = crate::transaction::TransactionExecutor::sqlite(sqlx_tx);

        if let Some(iso) = opts.isolation_level {
            let iso_sql = format!("SET TRANSACTION ISOLATION LEVEL {}", iso.as_sql());
            let sql = nautilus_dialect::Sql {
                text: iso_sql,
                params: vec![],
            };
            use crate::executor::execute_all;
            let _ = execute_all(&tx_executor, &sql).await?;
        }

        let tx_client = Client::new(nautilus_dialect::SqliteDialect, tx_executor);

        let result = if opts.timeout.as_millis() > 0 {
            match tokio::time::timeout(opts.timeout, f(tx_client.clone())).await {
                Ok(r) => r,
                Err(_) => {
                    let _ = tx_client.executor().rollback().await;
                    return Err(Error::database_msg("Transaction timed out"));
                }
            }
        } else {
            f(tx_client.clone()).await
        };

        match &result {
            Ok(_) => tx_client.executor().commit().await?,
            Err(_) => {
                let _ = tx_client.executor().rollback().await;
            }
        }

        result
    }
}

impl<E> Clone for Client<E>
where
    E: Executor,
{
    /// Cloning a Client is cheap - it only clones the Arc pointers, not the underlying data.
    fn clone(&self) -> Self {
        Self {
            dialect: Arc::clone(&self.dialect),
            executor: Arc::clone(&self.executor),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_client_is_send_sync() {
        // This is a compile-time test that Client implements Send + Sync
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<Client<crate::postgres::PgExecutor>>();
        assert_send_sync::<Client<crate::sqlite::SqliteExecutor>>();
        assert_send_sync::<Client<crate::mysql::MysqlExecutor>>();
    }
}