Skip to main content

sql_middleware/executor/
dispatch.rs

1use crate::error::SqlMiddlewareDbError;
2use crate::pool::MiddlewarePoolConnection;
3use crate::query_builder::QueryBuilder;
4use crate::results::ResultSet;
5use crate::types::{RowValues, StatementCacheMode};
6
7#[cfg(feature = "mssql")]
8use crate::mssql;
9#[cfg(feature = "postgres")]
10use crate::postgres;
11#[cfg(feature = "sqlite")]
12use crate::sqlite;
13#[cfg(feature = "turso")]
14use crate::turso;
15
16use super::targets::{BatchTarget, QueryTarget};
17
18/// Execute a batch against either a connection or a transaction.
19///
20/// # Errors
21/// Returns an error propagated from the underlying backend execution or transaction context.
22pub async fn execute_batch(
23    target: impl Into<BatchTarget<'_>>,
24    query: &str,
25) -> Result<(), SqlMiddlewareDbError> {
26    match target.into() {
27        BatchTarget::Connection(conn) => conn.execute_batch(query).await,
28        #[cfg(feature = "postgres")]
29        BatchTarget::PostgresTx(tx) => tx.execute_batch(query).await,
30        #[cfg(feature = "mssql")]
31        BatchTarget::MssqlTx(tx) => tx.execute_batch(query).await,
32        #[cfg(feature = "turso")]
33        BatchTarget::TursoTx(tx) => tx.execute_batch(query).await,
34        #[cfg(feature = "turso")]
35        BatchTarget::TypedTurso { conn } => {
36            crate::typed_turso::dml(conn, query, &[]).await?;
37            Ok(())
38        }
39        #[cfg(feature = "turso")]
40        BatchTarget::TypedTursoTx { conn } => {
41            crate::typed_turso::dml(conn, query, &[]).await?;
42            Ok(())
43        }
44    }
45}
46
47/// Start a fluent builder for either a connection or a transaction.
48pub fn query<'a>(target: impl Into<QueryTarget<'a>>, sql: &'a str) -> QueryBuilder<'a, 'a> {
49    QueryBuilder::new_target(target.into(), sql)
50}
51
52pub(crate) async fn execute_select_dispatch(
53    conn: &mut MiddlewarePoolConnection,
54    query: &str,
55    params: &[RowValues],
56    statement_cache_mode: StatementCacheMode,
57) -> Result<ResultSet, SqlMiddlewareDbError> {
58    #[cfg(not(any(feature = "sqlite", feature = "turso")))]
59    let _ = statement_cache_mode;
60
61    match conn {
62        #[cfg(feature = "postgres")]
63        MiddlewarePoolConnection::Postgres {
64            client: pg_client, ..
65        } => postgres::execute_query_on_client(pg_client, query, params).await,
66        #[cfg(feature = "sqlite")]
67        MiddlewarePoolConnection::Sqlite { .. } => {
68            let sqlite_client = conn.sqlite_conn_mut()?;
69            sqlite::execute_select(sqlite_client, query, params, statement_cache_mode).await
70        }
71        #[cfg(feature = "mssql")]
72        MiddlewarePoolConnection::Mssql {
73            conn: mssql_client, ..
74        } => mssql::execute_select(mssql_client, query, params).await,
75        #[cfg(feature = "turso")]
76        MiddlewarePoolConnection::Turso {
77            conn: turso_conn, ..
78        } => turso::execute_select(turso_conn, query, params, statement_cache_mode).await,
79        #[allow(unreachable_patterns)]
80        _ => Err(SqlMiddlewareDbError::Unimplemented(
81            "This database type is not enabled in the current build".to_string(),
82        )),
83    }
84}
85
86pub(crate) async fn execute_select_prepared_dispatch(
87    conn: &mut MiddlewarePoolConnection,
88    query: &str,
89    params: &[RowValues],
90    statement_cache_mode: StatementCacheMode,
91) -> Result<ResultSet, SqlMiddlewareDbError> {
92    #[cfg(not(any(feature = "sqlite", feature = "turso")))]
93    let _ = statement_cache_mode;
94
95    match conn {
96        #[cfg(feature = "postgres")]
97        MiddlewarePoolConnection::Postgres {
98            prepared_statements,
99            client: pg_client,
100            ..
101        } => {
102            let stmt = if let Some(stmt) = prepared_statements.get(query) {
103                stmt.clone()
104            } else {
105                let stmt = pg_client.prepare(query).await.map_err(|e| {
106                    SqlMiddlewareDbError::ExecutionError(format!("postgres prepare error: {e}"))
107                })?;
108                prepared_statements.insert(query.to_owned(), stmt.clone());
109                stmt
110            };
111            postgres::query::execute_query_prepared_statement_on_client(pg_client, &stmt, params)
112                .await
113        }
114        #[cfg(feature = "sqlite")]
115        MiddlewarePoolConnection::Sqlite { .. } => {
116            let sqlite_client = conn.sqlite_conn_mut()?;
117            sqlite::execute_select(sqlite_client, query, params, statement_cache_mode).await
118        }
119        #[cfg(feature = "mssql")]
120        MiddlewarePoolConnection::Mssql {
121            conn: mssql_client, ..
122        } => mssql::execute_select(mssql_client, query, params).await,
123        #[cfg(feature = "turso")]
124        MiddlewarePoolConnection::Turso {
125            conn: turso_conn, ..
126        } => turso::execute_select(turso_conn, query, params, statement_cache_mode).await,
127        #[allow(unreachable_patterns)]
128        _ => Err(SqlMiddlewareDbError::Unimplemented(
129            "This database type is not enabled in the current build".to_string(),
130        )),
131    }
132}
133
134pub(crate) async fn execute_dml_dispatch(
135    conn: &mut MiddlewarePoolConnection,
136    query: &str,
137    params: &[RowValues],
138    statement_cache_mode: StatementCacheMode,
139) -> Result<usize, SqlMiddlewareDbError> {
140    #[cfg(not(feature = "sqlite"))]
141    let _ = statement_cache_mode;
142
143    match conn {
144        #[cfg(feature = "postgres")]
145        MiddlewarePoolConnection::Postgres {
146            client: pg_client, ..
147        } => {
148            postgres::execute_dml_on_client(pg_client, query, params, "postgres execute error")
149                .await
150        }
151        #[cfg(feature = "sqlite")]
152        MiddlewarePoolConnection::Sqlite { .. } => {
153            let sqlite_client = conn.sqlite_conn_mut()?;
154            sqlite::execute_dml(sqlite_client, query, params, statement_cache_mode).await
155        }
156        #[cfg(feature = "mssql")]
157        MiddlewarePoolConnection::Mssql {
158            conn: mssql_client, ..
159        } => mssql::execute_dml(mssql_client, query, params).await,
160        #[cfg(feature = "turso")]
161        MiddlewarePoolConnection::Turso {
162            conn: turso_conn, ..
163        } => turso::execute_dml(turso_conn, query, params).await,
164        #[allow(unreachable_patterns)]
165        _ => Err(SqlMiddlewareDbError::Unimplemented(
166            "This database type is not enabled in the current build".to_string(),
167        )),
168    }
169}
170
171pub(crate) async fn execute_dml_prepared_dispatch(
172    conn: &mut MiddlewarePoolConnection,
173    query: &str,
174    params: &[RowValues],
175    statement_cache_mode: StatementCacheMode,
176) -> Result<usize, SqlMiddlewareDbError> {
177    #[cfg(not(feature = "sqlite"))]
178    let _ = statement_cache_mode;
179
180    match conn {
181        #[cfg(feature = "postgres")]
182        MiddlewarePoolConnection::Postgres {
183            prepared_statements,
184            client: pg_client,
185            ..
186        } => {
187            let stmt = if let Some(stmt) = prepared_statements.get(query) {
188                stmt.clone()
189            } else {
190                let stmt = pg_client.prepare(query).await.map_err(|e| {
191                    SqlMiddlewareDbError::ExecutionError(format!("postgres prepare error: {e}"))
192                })?;
193                prepared_statements.insert(query.to_owned(), stmt.clone());
194                stmt
195            };
196            postgres::query::execute_dml_prepared_statement_on_client(pg_client, &stmt, params)
197                .await
198        }
199        #[cfg(feature = "sqlite")]
200        MiddlewarePoolConnection::Sqlite { .. } => {
201            let sqlite_client = conn.sqlite_conn_mut()?;
202            sqlite::execute_dml(sqlite_client, query, params, statement_cache_mode).await
203        }
204        #[cfg(feature = "mssql")]
205        MiddlewarePoolConnection::Mssql {
206            conn: mssql_client, ..
207        } => mssql::execute_dml(mssql_client, query, params).await,
208        #[cfg(feature = "turso")]
209        MiddlewarePoolConnection::Turso {
210            conn: turso_conn, ..
211        } => turso::execute_dml(turso_conn, query, params).await,
212        #[allow(unreachable_patterns)]
213        _ => Err(SqlMiddlewareDbError::Unimplemented(
214            "This database type is not enabled in the current build".to_string(),
215        )),
216    }
217}