oxisql_core/traits.rs
1//! Core database traits: [`Connection`], [`Transaction`], and [`ToSqlValue`].
2
3use std::pin::Pin;
4
5use async_trait::async_trait;
6use futures::Stream;
7
8use crate::prepare::PreparedStatement;
9use crate::schema::{ColumnInfo, ForeignKeyInfo, IndexInfo, TableInfo};
10use crate::{OxiSqlError, Row, SqlWarning, Value};
11
12// ── ToSqlValue ──────────────────────────────────────────────────────────────
13
14/// A value that can be used as a positional SQL parameter (`$1`, `$2`, …).
15pub trait ToSqlValue: Send + Sync {
16 /// Convert `self` into a [`Value`].
17 fn to_value(&self) -> Value;
18}
19
20impl ToSqlValue for i64 {
21 fn to_value(&self) -> Value {
22 Value::I64(*self)
23 }
24}
25
26impl ToSqlValue for i32 {
27 fn to_value(&self) -> Value {
28 Value::I64(i64::from(*self))
29 }
30}
31
32impl ToSqlValue for f64 {
33 fn to_value(&self) -> Value {
34 Value::F64(*self)
35 }
36}
37
38impl ToSqlValue for str {
39 fn to_value(&self) -> Value {
40 Value::Text(self.to_string())
41 }
42}
43
44impl ToSqlValue for String {
45 fn to_value(&self) -> Value {
46 Value::Text(self.clone())
47 }
48}
49
50impl ToSqlValue for bool {
51 fn to_value(&self) -> Value {
52 Value::Bool(*self)
53 }
54}
55
56impl ToSqlValue for Vec<u8> {
57 fn to_value(&self) -> Value {
58 Value::Blob(self.clone())
59 }
60}
61
62impl<T: ToSqlValue> ToSqlValue for Option<T> {
63 fn to_value(&self) -> Value {
64 match self {
65 Some(v) => v.to_value(),
66 None => Value::Null,
67 }
68 }
69}
70
71/// Blanket impl: any shared reference to a [`ToSqlValue`] is also a
72/// [`ToSqlValue`]. This makes `&"hello"` (i.e. `&&str`) and `&&42_i64`
73/// work as parameter values without needing per-reference-depth impls.
74impl<T: ToSqlValue + ?Sized> ToSqlValue for &T {
75 fn to_value(&self) -> Value {
76 (**self).to_value()
77 }
78}
79
80/// Allow a [`Value`] to be used directly as a SQL parameter.
81///
82/// This enables fan-out helpers (e.g. `MultiConnection`) to collect
83/// parameter snapshots as `Vec<Value>` and pass them back to
84/// `Connection::execute` / `Connection::query` without an intermediate
85/// conversion step.
86impl ToSqlValue for Value {
87 fn to_value(&self) -> Value {
88 self.clone()
89 }
90}
91
92// ── Connection trait ────────────────────────────────────────────────────────
93
94/// An async database connection.
95///
96/// Implementations must be `Send + Sync` so they can be shared across async
97/// tasks. Use [`transaction`](Connection::transaction) to obtain a transaction
98/// that provides serialised, rollbackable execution.
99#[async_trait]
100pub trait Connection: Send + Sync {
101 /// Execute a DML/DDL statement and return the number of rows affected.
102 ///
103 /// Positional parameters (`$1`, `$2`, …) are substituted from `params`.
104 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
105
106 /// Execute a `SELECT` statement and return the result rows.
107 ///
108 /// Positional parameters (`$1`, `$2`, …) are substituted from `params`.
109 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError>;
110
111 /// Begin a transaction, returning a handle that can be committed or
112 /// rolled back.
113 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError>;
114
115 /// Execute multiple semicolon-separated SQL statements in a single call.
116 ///
117 /// The default implementation splits on `;` and calls [`execute`](Connection::execute)
118 /// for each statement. Backends may override with a more efficient
119 /// batch execution path (e.g. `batch_execute` on Postgres).
120 ///
121 /// **Note:** The default implementation uses a naive `;` split and will
122 /// break on SQL containing semicolons inside string literals. Backends
123 /// that override this method (Postgres, MySQL, embedded) handle this
124 /// correctly via their native batch execution APIs.
125 ///
126 /// Returns the total number of rows affected across all statements.
127 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
128 let mut total = 0u64;
129 for stmt in sql.split(';') {
130 let trimmed = stmt.trim();
131 if !trimmed.is_empty() {
132 total += self.execute(trimmed, &[]).await?;
133 }
134 }
135 Ok(total)
136 }
137
138 /// Lightweight connectivity check.
139 ///
140 /// The default implementation executes `SELECT 1` and discards the result.
141 /// Backends should override with a more efficient probe if available.
142 async fn ping(&self) -> Result<(), OxiSqlError> {
143 self.query("SELECT 1", &[]).await?;
144 Ok(())
145 }
146
147 /// Compile a SQL statement for repeated execution with different parameters.
148 ///
149 /// Returns a [`PreparedStatement`] that avoids re-parsing on each call.
150 /// The default implementation returns an error indicating the backend does
151 /// not support prepared statements; override in backend-specific impls.
152 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
153 let _ = sql;
154 Err(OxiSqlError::Other(
155 "prepared statements are not supported by this backend".into(),
156 ))
157 }
158
159 /// List all tables visible to the current connection.
160 ///
161 /// The default implementation returns an unsupported error.
162 /// Backends that support introspection (Postgres, MySQL, embedded) override this.
163 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
164 Err(OxiSqlError::Other(
165 "schema introspection not supported by this backend".into(),
166 ))
167 }
168
169 /// List all columns of the named table.
170 ///
171 /// The default implementation returns an unsupported error.
172 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
173 let _ = table;
174 Err(OxiSqlError::Other(
175 "schema introspection not supported by this backend".into(),
176 ))
177 }
178
179 /// List all indexes defined on the named table.
180 ///
181 /// The default implementation returns an unsupported error.
182 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
183 let _ = table;
184 Err(OxiSqlError::Other(
185 "schema introspection not supported by this backend".into(),
186 ))
187 }
188
189 /// List all foreign-key constraints on the named table.
190 ///
191 /// The default implementation returns an unsupported error.
192 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
193 let _ = table;
194 Err(OxiSqlError::Other(
195 "schema introspection not supported by this backend".into(),
196 ))
197 }
198
199 /// Execute a SQL statement with named parameters.
200 ///
201 /// Placeholders `:name`, `$name`, and `@name` are translated to positional
202 /// form and forwarded to [`execute`](Connection::execute).
203 ///
204 /// # Errors
205 ///
206 /// Returns [`OxiSqlError::Params`] if a named placeholder has no corresponding
207 /// entry in `params`, or any error from the underlying `execute` call.
208 async fn execute_named(
209 &self,
210 sql: &str,
211 params: &[(&str, &dyn ToSqlValue)],
212 ) -> Result<u64, OxiSqlError> {
213 let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
214 let positional = crate::params::bind_named_params(&names, params)?;
215 self.execute(&rewritten, &positional).await
216 }
217
218 /// Execute a SQL query with named parameters and return the result rows.
219 ///
220 /// Placeholders `:name`, `$name`, and `@name` are translated to positional
221 /// form and forwarded to [`query`](Connection::query).
222 ///
223 /// See [`execute_named`](Connection::execute_named) for placeholder syntax.
224 ///
225 /// # Errors
226 ///
227 /// Returns [`OxiSqlError::Params`] if a named placeholder has no corresponding
228 /// entry in `params`, or any error from the underlying `query` call.
229 async fn query_named(
230 &self,
231 sql: &str,
232 params: &[(&str, &dyn ToSqlValue)],
233 ) -> Result<Vec<Row>, OxiSqlError> {
234 let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
235 let positional = crate::params::bind_named_params(&names, params)?;
236 self.query(&rewritten, &positional).await
237 }
238
239 /// Return any SQL warnings generated by the most recently executed statement.
240 ///
241 /// The list is cleared before each `execute` or `query` call and repopulated
242 /// afterwards with warnings issued by the server (if any). Backends that do
243 /// not support warning retrieval return an empty `Vec` (the default).
244 ///
245 /// MySQL warnings are fetched via `SHOW WARNINGS` — only when the server
246 /// reports `warnings_count > 0` in the `OkPacket`, so there is no extra
247 /// round-trip on the common no-warning path.
248 ///
249 /// # Example
250 ///
251 /// ```rust,no_run
252 /// # use oxisql_core::Connection;
253 /// # async fn example(conn: &dyn Connection) {
254 /// conn.execute("INSERT INTO t (col) VALUES (?)", &[&"too long value"]).await.unwrap();
255 /// for w in conn.last_warnings() {
256 /// eprintln!("Warning {}: {}", w.code, w.message);
257 /// }
258 /// # }
259 /// ```
260 fn last_warnings(&self) -> Vec<SqlWarning> {
261 Vec::new()
262 }
263
264 /// Execute a `SELECT` and return rows as an async stream.
265 ///
266 /// The default implementation materialises the full result via [`query`](Connection::query)
267 /// then streams the rows. Backends that support true server-side cursors may
268 /// override with incremental fetching.
269 ///
270 /// This is a regular (non-`async`) method that returns `Pin<Box<dyn Stream>>` directly.
271 /// The default body wraps the future returned by [`query`](Connection::query) in a
272 /// one-shot stream and flattens the result into per-row items.
273 fn query_stream<'a>(
274 &'a self,
275 sql: &'a str,
276 params: &'a [&'a dyn ToSqlValue],
277 ) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
278 use futures::StreamExt;
279 let fut = self.query(sql, params);
280 Box::pin(futures::stream::once(fut).flat_map(|result| match result {
281 Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
282 Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
283 }))
284 }
285}
286
287/// A database transaction obtained via [`Connection::transaction`].
288///
289/// Dropping the transaction without calling [`commit`](Transaction::commit)
290/// will implicitly roll back any pending changes.
291#[async_trait]
292pub trait Transaction: Send + Sync {
293 /// Execute a DML/DDL statement within the transaction.
294 async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
295
296 /// Execute a `SELECT` statement within the transaction.
297 async fn query(
298 &mut self,
299 sql: &str,
300 params: &[&dyn ToSqlValue],
301 ) -> Result<Vec<Row>, OxiSqlError>;
302
303 /// Commit all changes made within this transaction.
304 async fn commit(self: Box<Self>) -> Result<(), OxiSqlError>;
305
306 /// Roll back all changes made within this transaction.
307 async fn rollback(self: Box<Self>) -> Result<(), OxiSqlError>;
308
309 /// Create a named savepoint within the transaction.
310 ///
311 /// Savepoints allow nested rollback to a specific point without aborting
312 /// the entire transaction. The `name` must be a valid SQL identifier
313 /// (alphanumeric and underscores only; no spaces or special characters).
314 ///
315 /// The default implementation returns an error. Backends that support
316 /// savepoints (Postgres, MySQL) should override this.
317 async fn savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
318 let _ = name;
319 Err(OxiSqlError::Other(
320 "savepoints are not supported by this backend".into(),
321 ))
322 }
323
324 /// Release (discard) a named savepoint.
325 ///
326 /// The default implementation returns an error.
327 async fn release_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
328 let _ = name;
329 Err(OxiSqlError::Other(
330 "savepoints are not supported by this backend".into(),
331 ))
332 }
333
334 /// Roll back the transaction to the named savepoint, undoing all changes
335 /// made after the savepoint was created without ending the transaction.
336 ///
337 /// The default implementation returns an error.
338 async fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
339 let _ = name;
340 Err(OxiSqlError::Other(
341 "savepoints are not supported by this backend".into(),
342 ))
343 }
344
345 /// Execute a `SELECT` within the transaction and return rows as an async stream.
346 ///
347 /// Mirrors [`Connection::query_stream`] exactly: the default implementation
348 /// materialises the full result via [`query`](Transaction::query) then
349 /// streams the rows one by one. Backends that support server-side cursors
350 /// may override with incremental fetching.
351 ///
352 /// This is a regular (non-`async`) method that returns `Pin<Box<dyn Stream>>`
353 /// directly. The lifetime `'a` ties both `self` and the SQL/params slices
354 /// to the returned stream so the borrow checker enforces that `self` is not
355 /// moved while the stream is live.
356 fn query_stream<'a>(
357 &'a mut self,
358 sql: &'a str,
359 params: &'a [&'a dyn ToSqlValue],
360 ) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
361 use futures::StreamExt;
362 let fut = self.query(sql, params);
363 Box::pin(futures::stream::once(fut).flat_map(|result| match result {
364 Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
365 Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
366 }))
367 }
368}