Skip to main content

oxisql_core/
traits.rs

1//! Core database traits: [`Connection`], [`Transaction`], and [`ToSqlValue`].
2
3use std::pin::Pin;
4
5use async_trait::async_trait;
6use futures::Stream;
7
8use crate::prepare::PreparedStatement;
9use crate::schema::{ColumnInfo, ForeignKeyInfo, IndexInfo, TableInfo};
10use crate::{OxiSqlError, Row, SqlWarning, Value};
11
12// ── ToSqlValue ──────────────────────────────────────────────────────────────
13
14/// A value that can be used as a positional SQL parameter (`$1`, `$2`, …).
15pub trait ToSqlValue: Send + Sync {
16    /// Convert `self` into a [`Value`].
17    fn to_value(&self) -> Value;
18}
19
20impl ToSqlValue for i64 {
21    fn to_value(&self) -> Value {
22        Value::I64(*self)
23    }
24}
25
26impl ToSqlValue for i32 {
27    fn to_value(&self) -> Value {
28        Value::I64(i64::from(*self))
29    }
30}
31
32impl ToSqlValue for f64 {
33    fn to_value(&self) -> Value {
34        Value::F64(*self)
35    }
36}
37
38impl ToSqlValue for str {
39    fn to_value(&self) -> Value {
40        Value::Text(self.to_string())
41    }
42}
43
44impl ToSqlValue for String {
45    fn to_value(&self) -> Value {
46        Value::Text(self.clone())
47    }
48}
49
50impl ToSqlValue for bool {
51    fn to_value(&self) -> Value {
52        Value::Bool(*self)
53    }
54}
55
56impl ToSqlValue for Vec<u8> {
57    fn to_value(&self) -> Value {
58        Value::Blob(self.clone())
59    }
60}
61
62impl<T: ToSqlValue> ToSqlValue for Option<T> {
63    fn to_value(&self) -> Value {
64        match self {
65            Some(v) => v.to_value(),
66            None => Value::Null,
67        }
68    }
69}
70
71/// Blanket impl: any shared reference to a [`ToSqlValue`] is also a
72/// [`ToSqlValue`].  This makes `&"hello"` (i.e. `&&str`) and `&&42_i64`
73/// work as parameter values without needing per-reference-depth impls.
74impl<T: ToSqlValue + ?Sized> ToSqlValue for &T {
75    fn to_value(&self) -> Value {
76        (**self).to_value()
77    }
78}
79
80/// Allow a [`Value`] to be used directly as a SQL parameter.
81///
82/// This enables fan-out helpers (e.g. `MultiConnection`) to collect
83/// parameter snapshots as `Vec<Value>` and pass them back to
84/// `Connection::execute` / `Connection::query` without an intermediate
85/// conversion step.
86impl ToSqlValue for Value {
87    fn to_value(&self) -> Value {
88        self.clone()
89    }
90}
91
92// ── Connection trait ────────────────────────────────────────────────────────
93
94/// An async database connection.
95///
96/// Implementations must be `Send + Sync` so they can be shared across async
97/// tasks.  Use [`transaction`](Connection::transaction) to obtain a transaction
98/// that provides serialised, rollbackable execution.
99#[async_trait]
100pub trait Connection: Send + Sync {
101    /// Execute a DML/DDL statement and return the number of rows affected.
102    ///
103    /// Positional parameters (`$1`, `$2`, …) are substituted from `params`.
104    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
105
106    /// Execute a `SELECT` statement and return the result rows.
107    ///
108    /// Positional parameters (`$1`, `$2`, …) are substituted from `params`.
109    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError>;
110
111    /// Begin a transaction, returning a handle that can be committed or
112    /// rolled back.
113    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError>;
114
115    /// Execute multiple semicolon-separated SQL statements in a single call.
116    ///
117    /// The default implementation splits on `;` and calls [`execute`](Connection::execute)
118    /// for each statement.  Backends may override with a more efficient
119    /// batch execution path (e.g. `batch_execute` on Postgres).
120    ///
121    /// **Note:** The default implementation uses a naive `;` split and will
122    /// break on SQL containing semicolons inside string literals.  Backends
123    /// that override this method (Postgres, MySQL, embedded) handle this
124    /// correctly via their native batch execution APIs.
125    ///
126    /// Returns the total number of rows affected across all statements.
127    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
128        let mut total = 0u64;
129        for stmt in sql.split(';') {
130            let trimmed = stmt.trim();
131            if !trimmed.is_empty() {
132                total += self.execute(trimmed, &[]).await?;
133            }
134        }
135        Ok(total)
136    }
137
138    /// Lightweight connectivity check.
139    ///
140    /// The default implementation executes `SELECT 1` and discards the result.
141    /// Backends should override with a more efficient probe if available.
142    async fn ping(&self) -> Result<(), OxiSqlError> {
143        self.query("SELECT 1", &[]).await?;
144        Ok(())
145    }
146
147    /// Compile a SQL statement for repeated execution with different parameters.
148    ///
149    /// Returns a [`PreparedStatement`] that avoids re-parsing on each call.
150    /// The default implementation returns an error indicating the backend does
151    /// not support prepared statements; override in backend-specific impls.
152    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
153        let _ = sql;
154        Err(OxiSqlError::Other(
155            "prepared statements are not supported by this backend".into(),
156        ))
157    }
158
159    /// List all tables visible to the current connection.
160    ///
161    /// The default implementation returns an unsupported error.
162    /// Backends that support introspection (Postgres, MySQL, embedded) override this.
163    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
164        Err(OxiSqlError::Other(
165            "schema introspection not supported by this backend".into(),
166        ))
167    }
168
169    /// List all columns of the named table.
170    ///
171    /// The default implementation returns an unsupported error.
172    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
173        let _ = table;
174        Err(OxiSqlError::Other(
175            "schema introspection not supported by this backend".into(),
176        ))
177    }
178
179    /// List all indexes defined on the named table.
180    ///
181    /// The default implementation returns an unsupported error.
182    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
183        let _ = table;
184        Err(OxiSqlError::Other(
185            "schema introspection not supported by this backend".into(),
186        ))
187    }
188
189    /// List all foreign-key constraints on the named table.
190    ///
191    /// The default implementation returns an unsupported error.
192    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
193        let _ = table;
194        Err(OxiSqlError::Other(
195            "schema introspection not supported by this backend".into(),
196        ))
197    }
198
199    /// Execute a SQL statement with named parameters.
200    ///
201    /// Placeholders `:name`, `$name`, and `@name` are translated to positional
202    /// form and forwarded to [`execute`](Connection::execute).
203    ///
204    /// # Errors
205    ///
206    /// Returns [`OxiSqlError::Params`] if a named placeholder has no corresponding
207    /// entry in `params`, or any error from the underlying `execute` call.
208    async fn execute_named(
209        &self,
210        sql: &str,
211        params: &[(&str, &dyn ToSqlValue)],
212    ) -> Result<u64, OxiSqlError> {
213        let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
214        let positional = crate::params::bind_named_params(&names, params)?;
215        self.execute(&rewritten, &positional).await
216    }
217
218    /// Execute a SQL query with named parameters and return the result rows.
219    ///
220    /// Placeholders `:name`, `$name`, and `@name` are translated to positional
221    /// form and forwarded to [`query`](Connection::query).
222    ///
223    /// See [`execute_named`](Connection::execute_named) for placeholder syntax.
224    ///
225    /// # Errors
226    ///
227    /// Returns [`OxiSqlError::Params`] if a named placeholder has no corresponding
228    /// entry in `params`, or any error from the underlying `query` call.
229    async fn query_named(
230        &self,
231        sql: &str,
232        params: &[(&str, &dyn ToSqlValue)],
233    ) -> Result<Vec<Row>, OxiSqlError> {
234        let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
235        let positional = crate::params::bind_named_params(&names, params)?;
236        self.query(&rewritten, &positional).await
237    }
238
239    /// Return any SQL warnings generated by the most recently executed statement.
240    ///
241    /// The list is cleared before each `execute` or `query` call and repopulated
242    /// afterwards with warnings issued by the server (if any).  Backends that do
243    /// not support warning retrieval return an empty `Vec` (the default).
244    ///
245    /// MySQL warnings are fetched via `SHOW WARNINGS` — only when the server
246    /// reports `warnings_count > 0` in the `OkPacket`, so there is no extra
247    /// round-trip on the common no-warning path.
248    ///
249    /// # Example
250    ///
251    /// ```rust,no_run
252    /// # use oxisql_core::Connection;
253    /// # async fn example(conn: &dyn Connection) {
254    /// conn.execute("INSERT INTO t (col) VALUES (?)", &[&"too long value"]).await.unwrap();
255    /// for w in conn.last_warnings() {
256    ///     eprintln!("Warning {}: {}", w.code, w.message);
257    /// }
258    /// # }
259    /// ```
260    fn last_warnings(&self) -> Vec<SqlWarning> {
261        Vec::new()
262    }
263
264    /// Execute a `SELECT` and return rows as an async stream.
265    ///
266    /// The default implementation materialises the full result via [`query`](Connection::query)
267    /// then streams the rows.  Backends that support true server-side cursors may
268    /// override with incremental fetching.
269    ///
270    /// This is a regular (non-`async`) method that returns `Pin<Box<dyn Stream>>` directly.
271    /// The default body wraps the future returned by [`query`](Connection::query) in a
272    /// one-shot stream and flattens the result into per-row items.
273    fn query_stream<'a>(
274        &'a self,
275        sql: &'a str,
276        params: &'a [&'a dyn ToSqlValue],
277    ) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
278        use futures::StreamExt;
279        let fut = self.query(sql, params);
280        Box::pin(futures::stream::once(fut).flat_map(|result| match result {
281            Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
282            Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
283        }))
284    }
285}
286
287/// A database transaction obtained via [`Connection::transaction`].
288///
289/// Dropping the transaction without calling [`commit`](Transaction::commit)
290/// will implicitly roll back any pending changes.
291#[async_trait]
292pub trait Transaction: Send + Sync {
293    /// Execute a DML/DDL statement within the transaction.
294    async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
295
296    /// Execute a `SELECT` statement within the transaction.
297    async fn query(
298        &mut self,
299        sql: &str,
300        params: &[&dyn ToSqlValue],
301    ) -> Result<Vec<Row>, OxiSqlError>;
302
303    /// Commit all changes made within this transaction.
304    async fn commit(self: Box<Self>) -> Result<(), OxiSqlError>;
305
306    /// Roll back all changes made within this transaction.
307    async fn rollback(self: Box<Self>) -> Result<(), OxiSqlError>;
308
309    /// Create a named savepoint within the transaction.
310    ///
311    /// Savepoints allow nested rollback to a specific point without aborting
312    /// the entire transaction.  The `name` must be a valid SQL identifier
313    /// (alphanumeric and underscores only; no spaces or special characters).
314    ///
315    /// The default implementation returns an error.  Backends that support
316    /// savepoints (Postgres, MySQL) should override this.
317    async fn savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
318        let _ = name;
319        Err(OxiSqlError::Other(
320            "savepoints are not supported by this backend".into(),
321        ))
322    }
323
324    /// Release (discard) a named savepoint.
325    ///
326    /// The default implementation returns an error.
327    async fn release_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
328        let _ = name;
329        Err(OxiSqlError::Other(
330            "savepoints are not supported by this backend".into(),
331        ))
332    }
333
334    /// Roll back the transaction to the named savepoint, undoing all changes
335    /// made after the savepoint was created without ending the transaction.
336    ///
337    /// The default implementation returns an error.
338    async fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
339        let _ = name;
340        Err(OxiSqlError::Other(
341            "savepoints are not supported by this backend".into(),
342        ))
343    }
344
345    /// Execute a `SELECT` within the transaction and return rows as an async stream.
346    ///
347    /// Mirrors [`Connection::query_stream`] exactly: the default implementation
348    /// materialises the full result via [`query`](Transaction::query) then
349    /// streams the rows one by one.  Backends that support server-side cursors
350    /// may override with incremental fetching.
351    ///
352    /// This is a regular (non-`async`) method that returns `Pin<Box<dyn Stream>>`
353    /// directly.  The lifetime `'a` ties both `self` and the SQL/params slices
354    /// to the returned stream so the borrow checker enforces that `self` is not
355    /// moved while the stream is live.
356    fn query_stream<'a>(
357        &'a mut self,
358        sql: &'a str,
359        params: &'a [&'a dyn ToSqlValue],
360    ) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
361        use futures::StreamExt;
362        let fut = self.query(sql, params);
363        Box::pin(futures::stream::once(fut).flat_map(|result| match result {
364            Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
365            Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
366        }))
367    }
368}