Skip to main content

oxisql_sqlite_compat/
connection.rs

1//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
2//!
3//! # Concurrency model
4//!
5//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
6//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
7//! `SqliteConnection` is a thin newtype that holds:
8//!
9//! - `conn: limbo::Connection` — the Limbo connection handle.
10//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
11//!   from issuing `BEGIN` concurrently on the same logical connection.  SQLite does
12//!   not support nested transactions, so only one task at a time may hold a
13//!   transaction.
14//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
15//!   diagnostics.
16//!
17//! # Affected-row count
18//!
19//! After each DML statement we call `conn.changes()` to read the row count that
20//! was committed by the most-recent write transaction.  DDL statements and
21//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
22//! contract per OxiSQL and `sqlite3_changes()` semantics.
23//!
24//! # Parameter binding
25//!
26//! OxiSQL passes `$1`, `$2`, … positional parameters.  SQLite / Limbo expects
27//! `?` placeholders.  `types::rewrite_params` performs a quote-aware
28//! translation before each statement is prepared.
29//!
30//! # Schema introspection
31//!
32//! [`Connection::tables`] queries `sqlite_master`.
33//! [`Connection::columns`] uses `PRAGMA table_info`.
34//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
35//! yet implemented in Limbo 0.0.22).
36//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
37//! surfaces FK metadata from its in-memory schema.
38//!
39//! # Transactions
40//!
41//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
42//! that wraps the same `limbo::Connection`.  The transaction holds a guard on
43//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
44//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
45//! execute `ROLLBACK` (best-effort, via `Drop`).
46//!
47//! # Prepared-statement cache
48//!
49//! All DML and DDL statements pass through an LRU cache keyed by the
50//! **rewritten SQL** (after `$N`→`?` translation).  The cache holds up to
51//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
52//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
53//!
54//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
55//! executed via `Statement::execute()` (which calls `reset()` before binding),
56//! and returned to the cache after execution.  `Statement::reset()` now also
57//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
58//! reuse produces correct per-execution change counts.
59//!
60//! # ROLLBACK
61//!
62//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
63//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`.  The engine
64//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
65//! that discards all pending changes.  The `Drop` impl also fires a best-effort
66//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
67//! `rollback()`.
68//!
69//! # Prepared-statement reuse (via SqlitePrepared)
70//!
71//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
72//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call.  The
73//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
74//! even though Limbo does not yet expose a stable compiled-statement cache.
75
76use std::num::NonZeroUsize;
77use std::sync::{Arc, Mutex as StdMutex};
78
79use async_trait::async_trait;
80use limbo::params::Params as LimboParams;
81use limbo::Builder;
82use tokio::sync::Mutex as TokioMutex;
83
84// ── statement-cache capacity ───────────────────────────────────────────────────
85
86/// Maximum number of compiled statements retained in the per-connection LRU
87/// cache.  Statements are keyed by their rewritten SQL (`?`-placeholder form).
88const STMT_CACHE_CAPACITY: usize = 128;
89
90use oxisql_core::{
91    ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
92    TableInfo, TableType, ToSqlValue, Transaction, Value,
93};
94
95use crate::error::SqliteCompatError;
96use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};
97
98// ── helpers ───────────────────────────────────────────────────────────────────
99
100/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
101///
102/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
103/// `SqliteConnection` is cloned.  The std `Mutex` is deliberately chosen over
104/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
105/// or insertion) and never held across an `.await` point.
106type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;
107
108/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
109fn new_stmt_cache() -> StmtCache {
110    // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
111    //         `NonZeroUsize::new` returns `None` only for 0, which this is not.
112    let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
113    Arc::new(StdMutex::new(lru::LruCache::new(cap)))
114}
115
116/// Execute a SQL statement that has already been rewritten to `?` placeholders.
117///
118/// On a cache miss, compiles the statement via `conn.prepare()`, inserts it into
119/// the cache, then executes it via `stmt.execute()`.  On a cache hit, retrieves
120/// the cached `limbo::Statement` and calls `stmt.execute()` directly —
121/// `Statement::execute()` calls `reset()` internally, which (after the engine
122/// fix in oxisqlite-core) also clears `n_change`, making reuse correct.
123///
124/// The affected-row count is read from `conn.changes()` after execution,
125/// which reflects the count committed by the most recent write transaction on
126/// this connection.  DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
127/// correct value per the OxiSQL contract.
128///
129/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
130/// function falls back to `conn.execute()` followed by `conn.changes()`.
131async fn exec_rewritten(
132    conn: &limbo::Connection,
133    sql: &str,
134    limbo_params: Vec<limbo::Value>,
135    cache: Option<&StmtCache>,
136) -> Result<u64, SqliteCompatError> {
137    let lp = if limbo_params.is_empty() {
138        LimboParams::None
139    } else {
140        LimboParams::Positional(limbo_params)
141    };
142
143    match cache {
144        Some(c) => {
145            // ── cache path ─────────────────────────────────────────────────────
146            //
147            // Retrieve a mutable reference to the cached statement, or compile a
148            // fresh one and insert it.  We hold the lock only for the duration of
149            // the lookup/insert, not across the `.await` in `stmt.execute()`.
150            //
151            // Because `StmtCache` holds the actual `Statement` value (not a
152            // reference-counted pointer to it), we need to take ownership on a
153            // cache miss and then put it back after execution.
154
155            // Try to remove a hit from the cache so we have owned access during
156            // the async execute call.
157            let cached_stmt: Option<limbo::Statement> = c
158                .lock()
159                .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
160                .pop(sql);
161
162            let mut stmt = match cached_stmt {
163                Some(s) => s,
164                None => {
165                    // Cache miss — compile a new statement.
166                    conn.prepare(sql).await.map_err(SqliteCompatError::from)?
167                }
168            };
169
170            // Execute the (possibly retrieved-from-cache) statement.
171            // `Statement::execute()` calls `reset()` before binding parameters,
172            // and `reset()` now also zeroes `n_change`, so reuse is correct.
173            stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
174
175            // Return the statement to the cache for future reuse.
176            c.lock()
177                .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
178                .put(sql.to_owned(), stmt);
179
180            // Read the affected-row count from the connection's native counter.
181            // DDL and TCL statements leave this at 0.
182            let n = conn
183                .changes()
184                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
185            Ok(n.max(0) as u64)
186        }
187        None => {
188            // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
189            conn.execute(sql, lp)
190                .await
191                .map_err(SqliteCompatError::from)?;
192            let n = conn
193                .changes()
194                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
195            Ok(n.max(0) as u64)
196        }
197    }
198}
199
200/// Execute a query that has already been rewritten to `?` placeholders and
201/// collect all result rows.
202///
203/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
204/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
205/// so that richer [`Value`] variants are produced when appropriate.
206async fn query_rewritten(
207    conn: &limbo::Connection,
208    sql: &str,
209    limbo_params: Vec<limbo::Value>,
210) -> Result<Vec<Row>, SqliteCompatError> {
211    let lp = if limbo_params.is_empty() {
212        LimboParams::None
213    } else {
214        LimboParams::Positional(limbo_params)
215    };
216
217    let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
218
219    // Collect column names and declared types together.
220    let col_info: Vec<(String, Option<String>)> = stmt
221        .columns()
222        .iter()
223        .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
224        .collect();
225
226    let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();
227
228    let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;
229
230    let mut rows: Vec<Row> = Vec::new();
231    while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
232        let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
233        for idx in 0..limbo_row.column_count() {
234            let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
235            let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
236            values.push(limbo_to_core_typed(raw, decl)?);
237        }
238        rows.push(Row::new(col_names.clone(), values));
239    }
240    Ok(rows)
241}
242
243// ── SqliteConnection ──────────────────────────────────────────────────────────
244
245/// A Limbo-backed SQLite connection implementing [`Connection`].
246///
247/// Create via [`SqliteConnection::open`] (file path) or
248/// [`SqliteConnection::open_memory`] (`:memory:`).
249///
250/// # Statement cache
251///
252/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
253/// objects (capacity: `STMT_CACHE_CAPACITY` = 128).  The cache is shared across
254/// clones of the same connection (the clones share the underlying
255/// `limbo::Connection`) and is updated on every DML/DDL execution.  Cache hits
256/// save the per-statement parse-and-compile round-trip inside Limbo.
257#[derive(Clone)]
258pub struct SqliteConnection {
259    conn: limbo::Connection,
260    txn_lock: Arc<TokioMutex<()>>,
261    stmt_cache: StmtCache,
262    path: String,
263}
264
265impl std::fmt::Debug for SqliteConnection {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
268        f.debug_struct("SqliteConnection")
269            .field("path", &self.path)
270            .field("stmt_cache_len", &cache_len)
271            .finish_non_exhaustive()
272    }
273}
274
275impl SqliteConnection {
276    /// Open a Limbo database at the given file path.
277    ///
278    /// Pass `":memory:"` for an in-memory database, or use
279    /// [`open_memory`][Self::open_memory] for clarity.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`OxiSqlError`] if the file cannot be opened or created.
284    pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
285        let db = Builder::new_local(path)
286            .build()
287            .await
288            .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
289        let conn = db
290            .connect()
291            .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
292        Ok(Self {
293            conn,
294            txn_lock: Arc::new(TokioMutex::new(())),
295            stmt_cache: new_stmt_cache(),
296            path: path.to_owned(),
297        })
298    }
299
300    /// Open a fresh in-memory Limbo database.
301    ///
302    /// # Errors
303    ///
304    /// Returns [`OxiSqlError`] if the engine cannot be initialised.
305    pub async fn open_memory() -> Result<Self, OxiSqlError> {
306        Self::open(":memory:").await
307    }
308
309    /// Return the path this connection was opened with.
310    pub fn path(&self) -> &str {
311        &self.path
312    }
313}
314
315// ── Connection impl ───────────────────────────────────────────────────────────
316
317#[async_trait]
318impl Connection for SqliteConnection {
319    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
320        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
321        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
322            .await
323            .map_err(OxiSqlError::from)
324    }
325
326    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
327        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
328        query_rewritten(&self.conn, &rewritten, limbo_params)
329            .await
330            .map_err(OxiSqlError::from)
331    }
332
333    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
334        // Acquire the exclusive transaction lock before issuing BEGIN.
335        // This prevents a second task from starting a concurrent transaction
336        // on the same SqliteConnection clone.
337        let guard = self.txn_lock.lock().await;
338        self.conn
339            .execute("BEGIN", LimboParams::None)
340            .await
341            .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
342        Ok(Box::new(SqliteTransaction {
343            conn: self.conn.clone(),
344            // Share the connection-level stmt_cache so that DML executed inside
345            // a transaction also benefits from cached compiled statements.
346            stmt_cache: Arc::clone(&self.stmt_cache),
347            // Transfer ownership of the mutex guard into the transaction.
348            // The guard is released when SqliteTransaction is dropped.
349            _guard: guard,
350            done: false,
351        }))
352    }
353
354    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
355        // Token-aware split: honours `;` inside string literals, quoted
356        // identifiers, block comments, and line comments.
357        let stmts = split_statements(sql);
358        let mut total = 0u64;
359        for stmt in stmts {
360            total += self.execute(stmt, &[]).await?;
361        }
362        Ok(total)
363    }
364
365    async fn ping(&self) -> Result<(), OxiSqlError> {
366        self.query("SELECT 1", &[]).await?;
367        Ok(())
368    }
369
370    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
371        Ok(Box::new(SqlitePrepared {
372            conn: &self.conn,
373            stmt_cache: Arc::clone(&self.stmt_cache),
374            sql: sql.to_owned(),
375        }))
376    }
377
378    // ── Schema introspection ──────────────────────────────────────────────────
379
380    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
381        let rows = self
382            .query(
383                "SELECT name, type FROM sqlite_master \
384                 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
385                 ORDER BY name",
386                &[],
387            )
388            .await?;
389
390        let infos = rows
391            .into_iter()
392            .map(|row| {
393                let name = row
394                    .get_by_index(0)
395                    .and_then(|v| {
396                        if let Value::Text(s) = v {
397                            Some(s.clone())
398                        } else {
399                            None
400                        }
401                    })
402                    .unwrap_or_default();
403                let ttype_str = row
404                    .get_by_index(1)
405                    .and_then(|v| {
406                        if let Value::Text(s) = v {
407                            Some(s.as_str())
408                        } else {
409                            None
410                        }
411                    })
412                    .unwrap_or("table");
413                let table_type = match ttype_str {
414                    "view" => TableType::View,
415                    _ => TableType::Base,
416                };
417                TableInfo {
418                    name,
419                    schema: None,
420                    table_type,
421                }
422            })
423            .collect();
424        Ok(infos)
425    }
426
427    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
428        // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
429        let sql = format!("PRAGMA table_info(\"{table}\")");
430        let rows = self.query(&sql, &[]).await?;
431
432        let infos = rows
433            .into_iter()
434            .map(|row| {
435                // Helper: get column by index as string or empty string.
436                let text_at = |r: &Row, idx: usize| -> String {
437                    r.get_by_index(idx)
438                        .and_then(|v| match v {
439                            Value::Text(s) => Some(s.clone()),
440                            Value::I64(n) => Some(n.to_string()),
441                            Value::Null => Some(String::new()),
442                            _ => None,
443                        })
444                        .unwrap_or_default()
445                };
446                let i64_at = |r: &Row, idx: usize| -> i64 {
447                    r.get_by_index(idx)
448                        .and_then(|v| {
449                            if let Value::I64(n) = v {
450                                Some(*n)
451                            } else {
452                                None
453                            }
454                        })
455                        .unwrap_or(0)
456                };
457
458                let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
459                let name = text_at(&row, 1);
460                let data_type = text_at(&row, 2);
461                let notnull = i64_at(&row, 3) != 0;
462                let default_val = row.get_by_index(4).and_then(|v| match v {
463                    Value::Text(s) => Some(s.clone()),
464                    Value::Null => None,
465                    other => Some(format!("{other:?}")),
466                });
467
468                ColumnInfo {
469                    name,
470                    ordinal_position: ordinal,
471                    data_type,
472                    nullable: !notnull,
473                    default: default_val,
474                    max_length: None,
475                    numeric_precision: None,
476                    numeric_scale: None,
477                }
478            })
479            .collect();
480        Ok(infos)
481    }
482
483    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
484        // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
485        // Fall back to sqlite_master for index names and uniqueness, then parse
486        // the index SQL to extract column names.  This is best-effort: multi-column
487        // indexes and expression indexes may not parse perfectly.
488        let sql = "SELECT name, sql FROM sqlite_master \
489                   WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
490        let rows = self.query(sql, &[&table]).await?;
491
492        let mut infos: Vec<IndexInfo> = Vec::new();
493        for row in rows {
494            let name = row
495                .get_by_index(0)
496                .and_then(|v| {
497                    if let Value::Text(s) = v {
498                        Some(s.clone())
499                    } else {
500                        None
501                    }
502                })
503                .unwrap_or_default();
504            let idx_sql = row
505                .get_by_index(1)
506                .and_then(|v| {
507                    if let Value::Text(s) = v {
508                        Some(s.clone())
509                    } else {
510                        None
511                    }
512                })
513                .unwrap_or_default();
514
515            // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
516            let upper = idx_sql.to_ascii_uppercase();
517            let unique = upper.contains("UNIQUE");
518
519            // Extract column list between the last `(` and `)`.
520            let columns: Vec<String> =
521                if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
522                    idx_sql[open + 1..close]
523                        .split(',')
524                        .map(|c| c.trim().to_string())
525                        .filter(|c| !c.is_empty())
526                        .collect()
527                } else {
528                    vec![]
529                };
530
531            infos.push(IndexInfo {
532                name,
533                columns,
534                unique,
535                primary: false,
536            });
537        }
538        Ok(infos)
539    }
540
541    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
542        // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
543        // directly from the in-memory schema, avoiding brittle DDL text parsing.
544        let escaped = table.replace('"', "\"\"");
545        let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
546        let rows = query_rewritten(&self.conn, &sql, vec![])
547            .await
548            .map_err(OxiSqlError::from)?;
549
550        // PRAGMA foreign_key_list columns (by index):
551        //  0: id INTEGER   — FK index within the table
552        //  1: seq INTEGER  — column position within a composite FK
553        //  2: table TEXT   — parent table name
554        //  3: from TEXT    — child column name
555        //  4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
556        //  5: on_update TEXT
557        //  6: on_delete TEXT
558        //  7: match TEXT
559        let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
560        for row in &rows {
561            let id = match row.get_by_index(0) {
562                Some(Value::I64(v)) => *v,
563                _ => 0,
564            };
565            let from_col = match row.get_by_index(3) {
566                Some(Value::Text(s)) => s.clone(),
567                _ => continue,
568            };
569            let foreign_table = match row.get_by_index(2) {
570                Some(Value::Text(s)) => s.clone(),
571                _ => continue,
572            };
573            let foreign_column = match row.get_by_index(4) {
574                Some(Value::Text(s)) => s.clone(),
575                _ => String::new(),
576            };
577            let on_update = match row.get_by_index(5) {
578                Some(Value::Text(s)) => Some(s.clone()),
579                _ => None,
580            };
581            let on_delete = match row.get_by_index(6) {
582                Some(Value::Text(s)) => Some(s.clone()),
583                _ => None,
584            };
585            let constraint_name = format!("fk_{table}_{id}");
586            infos.push(ForeignKeyInfo {
587                constraint_name,
588                column: from_col,
589                foreign_table,
590                foreign_column,
591                on_update,
592                on_delete,
593            });
594        }
595        Ok(infos)
596    }
597}
598
599// ── SqliteTransaction ─────────────────────────────────────────────────────────
600
601/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
602///
603/// Holds a guard on the connection-level transaction mutex so that no other
604/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
605/// When dropped without an explicit `commit` or `rollback`, the transaction
606/// attempts a best-effort `ROLLBACK` via a background task.
607pub struct SqliteTransaction<'a> {
608    conn: limbo::Connection,
609    stmt_cache: StmtCache,
610    _guard: tokio::sync::MutexGuard<'a, ()>,
611    done: bool,
612}
613
614impl<'a> Drop for SqliteTransaction<'a> {
615    fn drop(&mut self) {
616        if !self.done {
617            // Best-effort rollback on implicit drop.  We cannot `.await` inside
618            // `drop`, so we spawn a fire-and-forget task.  The mutex guard is
619            // released when `SqliteTransaction` is fully dropped (after this
620            // function body returns).
621            let conn = self.conn.clone();
622            tokio::spawn(async move {
623                if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
624                    log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
625                }
626            });
627        }
628    }
629}
630
631#[async_trait]
632impl<'a> Transaction for SqliteTransaction<'a> {
633    async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
634        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
635        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
636            .await
637            .map_err(OxiSqlError::from)
638    }
639
640    async fn query(
641        &mut self,
642        sql: &str,
643        params: &[&dyn ToSqlValue],
644    ) -> Result<Vec<Row>, OxiSqlError> {
645        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
646        query_rewritten(&self.conn, &rewritten, limbo_params)
647            .await
648            .map_err(OxiSqlError::from)
649    }
650
651    async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
652        self.done = true;
653        self.conn
654            .execute("COMMIT", LimboParams::None)
655            .await
656            .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
657        Ok(())
658    }
659
660    async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
661        // Mark done so that Drop does not attempt a second ROLLBACK.
662        self.done = true;
663        self.conn
664            .execute("ROLLBACK", LimboParams::None)
665            .await
666            .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
667        Ok(())
668    }
669}
670
671// ── SqlitePrepared ────────────────────────────────────────────────────────────
672
673/// A prepared statement backed by the connection-level LRU cache.
674///
675/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
676/// compiled fresh on a miss), executed, and returned to the cache.  Because
677/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
678/// change count without re-parsing the SQL.
679pub struct SqlitePrepared<'a> {
680    conn: &'a limbo::Connection,
681    stmt_cache: StmtCache,
682    sql: String,
683}
684
685#[async_trait]
686impl<'a> PreparedStatement for SqlitePrepared<'a> {
687    async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
688        let (rewritten, limbo_params) =
689            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
690        exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
691            .await
692            .map_err(OxiSqlError::from)
693    }
694
695    async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
696        let (rewritten, limbo_params) =
697            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
698        query_rewritten(self.conn, &rewritten, limbo_params)
699            .await
700            .map_err(OxiSqlError::from)
701    }
702
703    fn sql(&self) -> &str {
704        &self.sql
705    }
706}