Skip to main content

nautilus_connector/
client.rs

1//! Client combining a SQL dialect with a database executor.
2
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::error::{ConnectorError as Error, Result};
7use crate::transaction::TransactionOptions;
8use crate::ConnectorPoolOptions;
9use nautilus_dialect::Dialect;
10
11use crate::Executor;
12
13/// A Client holding both a Dialect (for SQL rendering) and an Executor (for query execution).
14///
15/// The Client uses `Arc` internally, making it cheap to clone and thread-safe.
16/// This allows the same client to be shared across multiple parts of your application.
17///
18/// The Client is generic over the Executor type to work around limitations with
19/// trait objects and Generic Associated Types (GATs).
20///
21/// # Example
22///
23/// ```no_run
24/// # use nautilus_connector::{Client, ConnectorResult};
25/// # async fn example() -> ConnectorResult<()> {
26/// let client = Client::postgres("postgres://user:pass@localhost/db").await?;
27/// // client can now be cloned and passed around cheaply
28/// let clone = client.clone();
29/// # Ok(())
30/// # }
31/// ```
32pub struct Client<E>
33where
34    E: Executor,
35{
36    dialect: Arc<dyn Dialect + Send + Sync>,
37    executor: Arc<E>,
38}
39
40impl<E> Client<E>
41where
42    E: Executor,
43{
44    /// Creates a new Client from a dialect and an executor.
45    ///
46    /// This is the generic constructor that works with any Dialect and Executor implementation.
47    ///
48    /// # Example
49    ///
50    /// ```no_run
51    /// # use nautilus_connector::{Client, PgExecutor, ConnectorResult};
52    /// # use nautilus_dialect::PostgresDialect;
53    /// # async fn example() -> ConnectorResult<()> {
54    /// let executor = PgExecutor::new("postgres://localhost/mydb").await?;
55    /// let dialect = PostgresDialect;
56    /// let client = Client::new(dialect, executor);
57    /// # Ok(())
58    /// # }
59    /// ```
60    pub fn new<D>(dialect: D, executor: E) -> Self
61    where
62        D: Dialect + Send + Sync + 'static,
63    {
64        Self {
65            dialect: Arc::new(dialect),
66            executor: Arc::new(executor),
67        }
68    }
69
70    /// Returns a reference to the underlying Dialect.
71    ///
72    /// Use this to render queries into SQL.
73    pub fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
74        &*self.dialect
75    }
76
77    /// Returns a reference to the underlying Executor.
78    ///
79    /// Use this to execute rendered SQL queries against the database.
80    pub fn executor(&self) -> &E {
81        &self.executor
82    }
83}
84
85async fn set_transaction_isolation(
86    tx_executor: &crate::transaction::TransactionExecutor,
87    isolation_level: Option<crate::IsolationLevel>,
88) -> Result<()> {
89    let Some(isolation_level) = isolation_level else {
90        return Ok(());
91    };
92
93    let sql = nautilus_dialect::Sql {
94        text: format!(
95            "SET TRANSACTION ISOLATION LEVEL {}",
96            isolation_level.as_sql()
97        ),
98        params: vec![],
99    };
100
101    crate::execute_all(tx_executor, &sql).await?;
102    Ok(())
103}
104
105async fn drive_transaction<F, Fut, T, D>(
106    tx_executor: crate::transaction::TransactionExecutor,
107    dialect: D,
108    opts: TransactionOptions,
109    supports_isolation_level: bool,
110    f: F,
111) -> Result<T>
112where
113    F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
114    Fut: Future<Output = Result<T>> + Send,
115    T: Send + 'static,
116    D: Dialect + Send + Sync + 'static,
117{
118    let TransactionOptions {
119        timeout,
120        isolation_level,
121    } = opts;
122
123    if supports_isolation_level {
124        set_transaction_isolation(&tx_executor, isolation_level).await?;
125    }
126
127    let tx_client = Client::new(dialect, tx_executor);
128
129    let result = if timeout.is_zero() {
130        f(tx_client.clone()).await
131    } else {
132        match tokio::time::timeout(timeout, f(tx_client.clone())).await {
133            Ok(result) => result,
134            Err(_) => {
135                let _ = tx_client.executor().rollback().await;
136                return Err(Error::database_msg("Transaction timed out"));
137            }
138        }
139    };
140
141    match &result {
142        Ok(_) => tx_client.executor().commit().await?,
143        Err(_) => {
144            let _ = tx_client.executor().rollback().await;
145        }
146    }
147
148    result
149}
150
151/// Convenience constructors for specific database backends.
152impl Client<crate::postgres::PgExecutor> {
153    /// Creates a new PostgreSQL client.
154    ///
155    /// This is a convenience constructor that creates both a PostgresDialect
156    /// and a PgExecutor, then wraps them in a Client.
157    ///
158    /// # Arguments
159    ///
160    /// * `url` - PostgreSQL connection string (e.g., "postgres://user:pass@localhost/db")
161    ///
162    /// # Example
163    ///
164    /// ```no_run
165    /// # use nautilus_connector::{Client, ConnectorResult};
166    /// # async fn example() -> ConnectorResult<()> {
167    /// let client = Client::postgres("postgres://localhost/mydb").await?;
168    /// # Ok(())
169    /// # }
170    /// ```
171    pub async fn postgres(url: &str) -> Result<Self> {
172        Self::postgres_with_options(url, ConnectorPoolOptions::default()).await
173    }
174
175    /// Creates a new PostgreSQL client with explicit pool overrides.
176    pub async fn postgres_with_options(
177        url: &str,
178        pool_options: ConnectorPoolOptions,
179    ) -> Result<Self> {
180        use crate::postgres::PgExecutor;
181        use nautilus_dialect::PostgresDialect;
182
183        let executor = PgExecutor::new_with_options(url, pool_options).await?;
184        let dialect = PostgresDialect;
185        Ok(Self::new(dialect, executor))
186    }
187
188    /// Execute an async closure inside a database transaction.
189    ///
190    /// The closure receives a `Client<TransactionExecutor>` whose queries all
191    /// run on the same underlying connection.  If the closure returns `Ok`,
192    /// the transaction is committed; if it returns `Err` (or panics), the
193    /// transaction is rolled back.
194    ///
195    /// # Example
196    ///
197    /// ```no_run
198    /// # use nautilus_connector::{Client, ConnectorResult};
199    /// # async fn example() -> ConnectorResult<()> {
200    /// let client = Client::postgres("postgres://localhost/mydb").await?;
201    /// let result = client.transaction(Default::default(), |tx| Box::pin(async move {
202    ///     // tx.executor() runs queries inside the transaction
203    ///     Ok(42)
204    /// })).await?;
205    /// # Ok(())
206    /// # }
207    /// ```
208    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
209    where
210        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
211        Fut: Future<Output = Result<T>> + Send,
212        T: Send + 'static,
213    {
214        let sqlx_tx = self
215            .executor()
216            .pool()
217            .begin()
218            .await
219            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
220        let tx_executor = crate::transaction::TransactionExecutor::postgres(sqlx_tx);
221
222        drive_transaction(
223            tx_executor,
224            nautilus_dialect::PostgresDialect,
225            opts,
226            true,
227            f,
228        )
229        .await
230    }
231}
232
233/// Convenience constructor for MySQL.
234impl Client<crate::mysql::MysqlExecutor> {
235    /// Creates a new MySQL client.
236    ///
237    /// This is a convenience constructor that creates both a MysqlDialect
238    /// and a MysqlExecutor, then wraps them in a Client.
239    ///
240    /// # Arguments
241    ///
242    /// * `url` - MySQL connection string (e.g., "mysql://user:pass@localhost/db")
243    ///
244    /// # Example
245    ///
246    /// ```no_run
247    /// # use nautilus_connector::{Client, ConnectorResult};
248    /// # async fn example() -> ConnectorResult<()> {
249    /// let client = Client::mysql("mysql://user:pass@localhost/mydb").await?;
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub async fn mysql(url: &str) -> Result<Self> {
254        Self::mysql_with_options(url, ConnectorPoolOptions::default()).await
255    }
256
257    /// Creates a new MySQL client with explicit pool overrides.
258    pub async fn mysql_with_options(url: &str, pool_options: ConnectorPoolOptions) -> Result<Self> {
259        use crate::mysql::MysqlExecutor;
260        use nautilus_dialect::MysqlDialect;
261
262        let executor = MysqlExecutor::new_with_options(url, pool_options).await?;
263        let dialect = MysqlDialect;
264        Ok(Self::new(dialect, executor))
265    }
266
267    /// Execute an async closure inside a MySQL transaction.
268    ///
269    /// See [`Client::<PgExecutor>::transaction`] for full documentation.
270    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
271    where
272        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
273        Fut: Future<Output = Result<T>> + Send,
274        T: Send + 'static,
275    {
276        let sqlx_tx = self
277            .executor()
278            .pool()
279            .begin()
280            .await
281            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
282        let tx_executor = crate::transaction::TransactionExecutor::mysql(sqlx_tx);
283
284        drive_transaction(tx_executor, nautilus_dialect::MysqlDialect, opts, true, f).await
285    }
286}
287
288/// Convenience constructor for SQLite.
289impl Client<crate::sqlite::SqliteExecutor> {
290    /// Creates a new SQLite client.
291    ///
292    /// This is a convenience constructor that creates both a SqliteDialect
293    /// and a SqliteExecutor, then wraps them in a Client.
294    ///
295    /// # Arguments
296    ///
297    /// * `url` - SQLite connection URL (e.g., `sqlite:mydb.db` or `sqlite::memory:`)
298    ///
299    /// # Example
300    ///
301    /// ```no_run
302    /// # use nautilus_connector::{Client, ConnectorResult};
303    /// # async fn example() -> ConnectorResult<()> {
304    /// let client = Client::sqlite("sqlite::memory:").await?;
305    /// # Ok(())
306    /// # }
307    /// ```
308    pub async fn sqlite(url: &str) -> Result<Self> {
309        Self::sqlite_with_options(url, ConnectorPoolOptions::default()).await
310    }
311
312    /// Creates a new SQLite client with explicit pool overrides.
313    pub async fn sqlite_with_options(
314        url: &str,
315        pool_options: ConnectorPoolOptions,
316    ) -> Result<Self> {
317        use crate::sqlite::SqliteExecutor;
318        use nautilus_dialect::SqliteDialect;
319
320        let executor = SqliteExecutor::new_with_options(url, pool_options).await?;
321        let dialect = SqliteDialect;
322        Ok(Self::new(dialect, executor))
323    }
324
325    /// Execute an async closure inside a SQLite transaction.
326    ///
327    /// See [`Client::<PgExecutor>::transaction`] for full documentation.
328    /// SQLite ignores `TransactionOptions::isolation_level` because it does not
329    /// support `SET TRANSACTION ISOLATION LEVEL`.
330    pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
331    where
332        F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
333        Fut: Future<Output = Result<T>> + Send,
334        T: Send + 'static,
335    {
336        let sqlx_tx = self
337            .executor()
338            .pool()
339            .begin()
340            .await
341            .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
342        let tx_executor = crate::transaction::TransactionExecutor::sqlite(sqlx_tx);
343
344        drive_transaction(tx_executor, nautilus_dialect::SqliteDialect, opts, false, f).await
345    }
346}
347
348impl<E> Clone for Client<E>
349where
350    E: Executor,
351{
352    /// Cloning a Client is cheap - it only clones the Arc pointers, not the underlying data.
353    fn clone(&self) -> Self {
354        Self {
355            dialect: Arc::clone(&self.dialect),
356            executor: Arc::clone(&self.executor),
357        }
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn test_client_is_send_sync() {
367        fn assert_send_sync<T: Send + Sync>() {}
368        assert_send_sync::<Client<crate::postgres::PgExecutor>>();
369        assert_send_sync::<Client<crate::sqlite::SqliteExecutor>>();
370        assert_send_sync::<Client<crate::mysql::MysqlExecutor>>();
371    }
372
373    #[tokio::test]
374    async fn sqlite_transaction_ignores_isolation_level() {
375        let client = Client::sqlite("sqlite::memory:")
376            .await
377            .expect("sqlite client should be created");
378
379        let result = client
380            .transaction(
381                TransactionOptions {
382                    timeout: std::time::Duration::from_secs(1),
383                    isolation_level: Some(crate::IsolationLevel::Serializable),
384                },
385                |tx| {
386                    Box::pin(async move {
387                        let sql = nautilus_dialect::Sql {
388                            text: "SELECT 1 AS one".to_string(),
389                            params: vec![],
390                        };
391                        let rows = crate::execute_all(tx.executor(), &sql).await?;
392                        assert_eq!(rows.len(), 1);
393                        Ok(())
394                    })
395                },
396            )
397            .await;
398
399        assert!(
400            result.is_ok(),
401            "sqlite transaction should not fail when an isolation level is requested: {result:?}"
402        );
403    }
404}