1use std::future::Future;
4use std::sync::Arc;
5
6use crate::error::{ConnectorError as Error, Result};
7use crate::transaction::TransactionOptions;
8use crate::ConnectorPoolOptions;
9use nautilus_dialect::Dialect;
10
11use crate::Executor;
12
13pub struct Client<E>
33where
34 E: Executor,
35{
36 dialect: Arc<dyn Dialect + Send + Sync>,
37 executor: Arc<E>,
38}
39
40impl<E> Client<E>
41where
42 E: Executor,
43{
44 pub fn new<D>(dialect: D, executor: E) -> Self
61 where
62 D: Dialect + Send + Sync + 'static,
63 {
64 Self {
65 dialect: Arc::new(dialect),
66 executor: Arc::new(executor),
67 }
68 }
69
70 pub fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
74 &*self.dialect
75 }
76
77 pub fn executor(&self) -> &E {
81 &self.executor
82 }
83}
84
85async fn set_transaction_isolation(
86 tx_executor: &crate::transaction::TransactionExecutor,
87 isolation_level: Option<crate::IsolationLevel>,
88) -> Result<()> {
89 let Some(isolation_level) = isolation_level else {
90 return Ok(());
91 };
92
93 let sql = nautilus_dialect::Sql {
94 text: format!(
95 "SET TRANSACTION ISOLATION LEVEL {}",
96 isolation_level.as_sql()
97 ),
98 params: vec![],
99 };
100
101 crate::execute_all(tx_executor, &sql).await?;
102 Ok(())
103}
104
105async fn drive_transaction<F, Fut, T, D>(
106 tx_executor: crate::transaction::TransactionExecutor,
107 dialect: D,
108 opts: TransactionOptions,
109 supports_isolation_level: bool,
110 f: F,
111) -> Result<T>
112where
113 F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
114 Fut: Future<Output = Result<T>> + Send,
115 T: Send + 'static,
116 D: Dialect + Send + Sync + 'static,
117{
118 let TransactionOptions {
119 timeout,
120 isolation_level,
121 } = opts;
122
123 if supports_isolation_level {
124 set_transaction_isolation(&tx_executor, isolation_level).await?;
125 }
126
127 let tx_client = Client::new(dialect, tx_executor);
128
129 let result = if timeout.is_zero() {
130 f(tx_client.clone()).await
131 } else {
132 match tokio::time::timeout(timeout, f(tx_client.clone())).await {
133 Ok(result) => result,
134 Err(_) => {
135 let _ = tx_client.executor().rollback().await;
136 return Err(Error::database_msg("Transaction timed out"));
137 }
138 }
139 };
140
141 match &result {
142 Ok(_) => tx_client.executor().commit().await?,
143 Err(_) => {
144 let _ = tx_client.executor().rollback().await;
145 }
146 }
147
148 result
149}
150
151impl Client<crate::postgres::PgExecutor> {
153 pub async fn postgres(url: &str) -> Result<Self> {
172 Self::postgres_with_options(url, ConnectorPoolOptions::default()).await
173 }
174
175 pub async fn postgres_with_options(
177 url: &str,
178 pool_options: ConnectorPoolOptions,
179 ) -> Result<Self> {
180 use crate::postgres::PgExecutor;
181 use nautilus_dialect::PostgresDialect;
182
183 let executor = PgExecutor::new_with_options(url, pool_options).await?;
184 let dialect = PostgresDialect;
185 Ok(Self::new(dialect, executor))
186 }
187
188 pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
209 where
210 F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
211 Fut: Future<Output = Result<T>> + Send,
212 T: Send + 'static,
213 {
214 let sqlx_tx = self
215 .executor()
216 .pool()
217 .begin()
218 .await
219 .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
220 let tx_executor = crate::transaction::TransactionExecutor::postgres(sqlx_tx);
221
222 drive_transaction(
223 tx_executor,
224 nautilus_dialect::PostgresDialect,
225 opts,
226 true,
227 f,
228 )
229 .await
230 }
231}
232
233impl Client<crate::mysql::MysqlExecutor> {
235 pub async fn mysql(url: &str) -> Result<Self> {
254 Self::mysql_with_options(url, ConnectorPoolOptions::default()).await
255 }
256
257 pub async fn mysql_with_options(url: &str, pool_options: ConnectorPoolOptions) -> Result<Self> {
259 use crate::mysql::MysqlExecutor;
260 use nautilus_dialect::MysqlDialect;
261
262 let executor = MysqlExecutor::new_with_options(url, pool_options).await?;
263 let dialect = MysqlDialect;
264 Ok(Self::new(dialect, executor))
265 }
266
267 pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
271 where
272 F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
273 Fut: Future<Output = Result<T>> + Send,
274 T: Send + 'static,
275 {
276 let sqlx_tx = self
277 .executor()
278 .pool()
279 .begin()
280 .await
281 .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
282 let tx_executor = crate::transaction::TransactionExecutor::mysql(sqlx_tx);
283
284 drive_transaction(tx_executor, nautilus_dialect::MysqlDialect, opts, true, f).await
285 }
286}
287
288impl Client<crate::sqlite::SqliteExecutor> {
290 pub async fn sqlite(url: &str) -> Result<Self> {
309 Self::sqlite_with_options(url, ConnectorPoolOptions::default()).await
310 }
311
312 pub async fn sqlite_with_options(
314 url: &str,
315 pool_options: ConnectorPoolOptions,
316 ) -> Result<Self> {
317 use crate::sqlite::SqliteExecutor;
318 use nautilus_dialect::SqliteDialect;
319
320 let executor = SqliteExecutor::new_with_options(url, pool_options).await?;
321 let dialect = SqliteDialect;
322 Ok(Self::new(dialect, executor))
323 }
324
325 pub async fn transaction<F, Fut, T>(&self, opts: TransactionOptions, f: F) -> Result<T>
331 where
332 F: FnOnce(Client<crate::transaction::TransactionExecutor>) -> Fut,
333 Fut: Future<Output = Result<T>> + Send,
334 T: Send + 'static,
335 {
336 let sqlx_tx = self
337 .executor()
338 .pool()
339 .begin()
340 .await
341 .map_err(|e| Error::connection(e, "Failed to begin transaction"))?;
342 let tx_executor = crate::transaction::TransactionExecutor::sqlite(sqlx_tx);
343
344 drive_transaction(tx_executor, nautilus_dialect::SqliteDialect, opts, false, f).await
345 }
346}
347
348impl<E> Clone for Client<E>
349where
350 E: Executor,
351{
352 fn clone(&self) -> Self {
354 Self {
355 dialect: Arc::clone(&self.dialect),
356 executor: Arc::clone(&self.executor),
357 }
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 #[test]
366 fn test_client_is_send_sync() {
367 fn assert_send_sync<T: Send + Sync>() {}
368 assert_send_sync::<Client<crate::postgres::PgExecutor>>();
369 assert_send_sync::<Client<crate::sqlite::SqliteExecutor>>();
370 assert_send_sync::<Client<crate::mysql::MysqlExecutor>>();
371 }
372
373 #[tokio::test]
374 async fn sqlite_transaction_ignores_isolation_level() {
375 let client = Client::sqlite("sqlite::memory:")
376 .await
377 .expect("sqlite client should be created");
378
379 let result = client
380 .transaction(
381 TransactionOptions {
382 timeout: std::time::Duration::from_secs(1),
383 isolation_level: Some(crate::IsolationLevel::Serializable),
384 },
385 |tx| {
386 Box::pin(async move {
387 let sql = nautilus_dialect::Sql {
388 text: "SELECT 1 AS one".to_string(),
389 params: vec![],
390 };
391 let rows = crate::execute_all(tx.executor(), &sql).await?;
392 assert_eq!(rows.len(), 1);
393 Ok(())
394 })
395 },
396 )
397 .await;
398
399 assert!(
400 result.is_ok(),
401 "sqlite transaction should not fail when an isolation level is requested: {result:?}"
402 );
403 }
404}