Skip to main content

oxisql_sqlite_compat/
connection.rs

1//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
2//!
3//! # Concurrency model
4//!
5//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
6//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
7//! `SqliteConnection` is a thin newtype that holds:
8//!
9//! - `conn: limbo::Connection` — the Limbo connection handle.
10//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
11//!   from issuing `BEGIN` concurrently on the same logical connection.  SQLite does
12//!   not support nested transactions, so only one task at a time may hold a
13//!   transaction.
14//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
15//!   diagnostics.
16//!
17//! # Affected-row count
18//!
19//! After each DML statement we call `conn.changes()` to read the row count that
20//! was committed by the most-recent write transaction.  DDL statements and
21//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
22//! contract per OxiSQL and `sqlite3_changes()` semantics.
23//!
24//! # Parameter binding
25//!
26//! OxiSQL passes `$1`, `$2`, … positional parameters.  SQLite / Limbo expects
27//! `?` placeholders.  `types::rewrite_params` performs a quote-aware
28//! translation before each statement is prepared.
29//!
30//! # Schema introspection
31//!
32//! [`Connection::tables`] queries `sqlite_master`.
33//! [`Connection::columns`] uses `PRAGMA table_info`.
34//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
35//! yet implemented in Limbo 0.0.22).
36//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
37//! surfaces FK metadata from its in-memory schema.
38//!
39//! # Transactions
40//!
41//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
42//! that wraps the same `limbo::Connection`.  The transaction holds a guard on
43//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
44//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
45//! execute `ROLLBACK` (best-effort, via `Drop`).
46//!
47//! # Prepared-statement cache
48//!
49//! All DML and DDL statements pass through an LRU cache keyed by the
50//! **rewritten SQL** (after `$N`→`?` translation).  The cache holds up to
51//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
52//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
53//!
54//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
55//! executed via `Statement::execute()` (which calls `reset()` before binding),
56//! and returned to the cache after execution.  `Statement::reset()` now also
57//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
58//! reuse produces correct per-execution change counts.
59//!
60//! # ROLLBACK
61//!
62//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
63//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`.  The engine
64//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
65//! that discards all pending changes.  The `Drop` impl also fires a best-effort
66//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
67//! `rollback()`.
68//!
69//! # Prepared-statement reuse (via SqlitePrepared)
70//!
71//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
72//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call.  The
73//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
74//! even though Limbo does not yet expose a stable compiled-statement cache.
75
76use std::num::NonZeroUsize;
77use std::sync::{Arc, Mutex as StdMutex};
78
79use async_trait::async_trait;
80use limbo::params::Params as LimboParams;
81use limbo::Builder;
82use tokio::sync::Mutex as TokioMutex;
83
84// ── statement-cache capacity ───────────────────────────────────────────────────
85
86/// Maximum number of compiled statements retained in the per-connection LRU
87/// cache.  Statements are keyed by their rewritten SQL (`?`-placeholder form).
88const STMT_CACHE_CAPACITY: usize = 128;
89
90use oxisql_core::{
91    ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
92    TableInfo, TableType, ToSqlValue, Transaction, Value,
93};
94
95use crate::error::SqliteCompatError;
96use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};
97
98// ── helpers ───────────────────────────────────────────────────────────────────
99
100/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
101///
102/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
103/// `SqliteConnection` is cloned.  The std `Mutex` is deliberately chosen over
104/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
105/// or insertion) and never held across an `.await` point.
106type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;
107
108/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
109fn new_stmt_cache() -> StmtCache {
110    // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
111    //         `NonZeroUsize::new` returns `None` only for 0, which this is not.
112    let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
113    Arc::new(StdMutex::new(lru::LruCache::new(cap)))
114}
115
116/// Execute a SQL statement that has already been rewritten to `?` placeholders.
117///
118/// On a cache miss, compiles the statement via `conn.prepare()`, inserts it into
119/// the cache, then executes it via `stmt.execute()`.  On a cache hit, retrieves
120/// the cached `limbo::Statement` and calls `stmt.execute()` directly —
121/// `Statement::execute()` calls `reset()` internally, which (after the engine
122/// fix in oxisqlite-core) also clears `n_change`, making reuse correct.
123///
124/// The affected-row count is read from `conn.changes()` after execution,
125/// which reflects the count committed by the most recent write transaction on
126/// this connection.  DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
127/// correct value per the OxiSQL contract.
128///
129/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
130/// function falls back to `conn.execute()` followed by `conn.changes()`.
131async fn exec_rewritten(
132    conn: &limbo::Connection,
133    sql: &str,
134    limbo_params: Vec<limbo::Value>,
135    cache: Option<&StmtCache>,
136) -> Result<u64, SqliteCompatError> {
137    let lp = if limbo_params.is_empty() {
138        LimboParams::None
139    } else {
140        LimboParams::Positional(limbo_params)
141    };
142
143    match cache {
144        Some(c) => {
145            // ── cache path ─────────────────────────────────────────────────────
146            //
147            // DDL statements (CREATE, DROP, ALTER, PRAGMA, VACUUM) must bypass
148            // the statement cache entirely.  Their compiled programs embed
149            // schema-state decisions (e.g., IF NOT EXISTS checks, ParseSchema
150            // opcodes) that are only valid at compile time.  Re-executing a
151            // stale DDL program against a changed schema causes internal
152            // assertion failures inside the engine.
153            let is_ddl = {
154                let upper = sql.trim_start().to_ascii_uppercase();
155                upper.starts_with("CREATE")
156                    || upper.starts_with("DROP")
157                    || upper.starts_with("ALTER")
158                    || upper.starts_with("PRAGMA")
159                    || upper.starts_with("VACUUM")
160            };
161
162            if is_ddl {
163                // DDL: always compile fresh, never read from / write to cache.
164                let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
165                stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
166                let n = conn
167                    .changes()
168                    .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
169                return Ok(n.max(0) as u64);
170            }
171
172            // ── DML / query cache path ─────────────────────────────────────────
173            //
174            // Retrieve a mutable reference to the cached statement, or compile a
175            // fresh one and insert it.  We hold the lock only for the duration of
176            // the lookup/insert, not across the `.await` in `stmt.execute()`.
177            //
178            // Because `StmtCache` holds the actual `Statement` value (not a
179            // reference-counted pointer to it), we need to take ownership on a
180            // cache miss and then put it back after execution.
181
182            // Try to remove a hit from the cache so we have owned access during
183            // the async execute call.
184            let cached_stmt: Option<limbo::Statement> = c
185                .lock()
186                .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
187                .pop(sql);
188
189            let mut stmt = match cached_stmt {
190                Some(s) => s,
191                None => {
192                    // Cache miss — compile a new statement.
193                    conn.prepare(sql).await.map_err(SqliteCompatError::from)?
194                }
195            };
196
197            // Execute the (possibly retrieved-from-cache) statement.
198            // `Statement::execute()` calls `reset()` before binding parameters,
199            // and `reset()` now also zeroes `n_change`, so reuse is correct.
200            stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
201
202            // Return the statement to the cache for future reuse (DML only).
203            c.lock()
204                .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
205                .put(sql.to_owned(), stmt);
206
207            // Read the affected-row count from the connection's native counter.
208            let n = conn
209                .changes()
210                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
211            Ok(n.max(0) as u64)
212        }
213        None => {
214            // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
215            conn.execute(sql, lp)
216                .await
217                .map_err(SqliteCompatError::from)?;
218            let n = conn
219                .changes()
220                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
221            Ok(n.max(0) as u64)
222        }
223    }
224}
225
226/// Execute a query that has already been rewritten to `?` placeholders and
227/// collect all result rows.
228///
229/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
230/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
231/// so that richer [`Value`] variants are produced when appropriate.
232async fn query_rewritten(
233    conn: &limbo::Connection,
234    sql: &str,
235    limbo_params: Vec<limbo::Value>,
236) -> Result<Vec<Row>, SqliteCompatError> {
237    let lp = if limbo_params.is_empty() {
238        LimboParams::None
239    } else {
240        LimboParams::Positional(limbo_params)
241    };
242
243    let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
244
245    // Collect column names and declared types together.
246    let col_info: Vec<(String, Option<String>)> = stmt
247        .columns()
248        .iter()
249        .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
250        .collect();
251
252    let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();
253
254    let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;
255
256    let mut rows: Vec<Row> = Vec::new();
257    while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
258        let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
259        for idx in 0..limbo_row.column_count() {
260            let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
261            let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
262            values.push(limbo_to_core_typed(raw, decl)?);
263        }
264        rows.push(Row::new(col_names.clone(), values));
265    }
266    Ok(rows)
267}
268
269// ── SqliteConnection ──────────────────────────────────────────────────────────
270
271/// A Limbo-backed SQLite connection implementing [`Connection`].
272///
273/// Create via [`SqliteConnection::open`] (file path) or
274/// [`SqliteConnection::open_memory`] (`:memory:`).
275///
276/// # Statement cache
277///
278/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
279/// objects (capacity: `STMT_CACHE_CAPACITY` = 128).  The cache is shared across
280/// clones of the same connection (the clones share the underlying
281/// `limbo::Connection`) and is updated on every DML/DDL execution.  Cache hits
282/// save the per-statement parse-and-compile round-trip inside Limbo.
283#[derive(Clone)]
284pub struct SqliteConnection {
285    conn: limbo::Connection,
286    txn_lock: Arc<TokioMutex<()>>,
287    stmt_cache: StmtCache,
288    path: String,
289}
290
291impl std::fmt::Debug for SqliteConnection {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
294        f.debug_struct("SqliteConnection")
295            .field("path", &self.path)
296            .field("stmt_cache_len", &cache_len)
297            .finish_non_exhaustive()
298    }
299}
300
301impl SqliteConnection {
302    /// Open a Limbo database at the given file path.
303    ///
304    /// Pass `":memory:"` for an in-memory database, or use
305    /// [`open_memory`][Self::open_memory] for clarity.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`OxiSqlError`] if the file cannot be opened or created.
310    pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
311        let db = Builder::new_local(path)
312            .build()
313            .await
314            .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
315        let conn = db
316            .connect()
317            .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
318        Ok(Self {
319            conn,
320            txn_lock: Arc::new(TokioMutex::new(())),
321            stmt_cache: new_stmt_cache(),
322            path: path.to_owned(),
323        })
324    }
325
326    /// Open a fresh in-memory Limbo database.
327    ///
328    /// # Errors
329    ///
330    /// Returns [`OxiSqlError`] if the engine cannot be initialised.
331    pub async fn open_memory() -> Result<Self, OxiSqlError> {
332        Self::open(":memory:").await
333    }
334
335    /// Return the path this connection was opened with.
336    pub fn path(&self) -> &str {
337        &self.path
338    }
339}
340
341// ── Connection impl ───────────────────────────────────────────────────────────
342
343#[async_trait]
344impl Connection for SqliteConnection {
345    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
346        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
347        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
348            .await
349            .map_err(OxiSqlError::from)
350    }
351
352    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
353        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
354        query_rewritten(&self.conn, &rewritten, limbo_params)
355            .await
356            .map_err(OxiSqlError::from)
357    }
358
359    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
360        // Acquire the exclusive transaction lock before issuing BEGIN.
361        // This prevents a second task from starting a concurrent transaction
362        // on the same SqliteConnection clone.
363        let guard = self.txn_lock.lock().await;
364        self.conn
365            .execute("BEGIN", LimboParams::None)
366            .await
367            .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
368        Ok(Box::new(SqliteTransaction {
369            conn: self.conn.clone(),
370            // Share the connection-level stmt_cache so that DML executed inside
371            // a transaction also benefits from cached compiled statements.
372            stmt_cache: Arc::clone(&self.stmt_cache),
373            // Transfer ownership of the mutex guard into the transaction.
374            // The guard is released when SqliteTransaction is dropped.
375            _guard: guard,
376            done: false,
377        }))
378    }
379
380    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
381        // Token-aware split: honours `;` inside string literals, quoted
382        // identifiers, block comments, and line comments.
383        let stmts = split_statements(sql);
384        let mut total = 0u64;
385        for stmt in stmts {
386            total += self.execute(stmt, &[]).await?;
387        }
388        Ok(total)
389    }
390
391    async fn ping(&self) -> Result<(), OxiSqlError> {
392        self.query("SELECT 1", &[]).await?;
393        Ok(())
394    }
395
396    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
397        Ok(Box::new(SqlitePrepared {
398            conn: &self.conn,
399            stmt_cache: Arc::clone(&self.stmt_cache),
400            sql: sql.to_owned(),
401        }))
402    }
403
404    // ── Schema introspection ──────────────────────────────────────────────────
405
406    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
407        let rows = self
408            .query(
409                "SELECT name, type FROM sqlite_master \
410                 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
411                 ORDER BY name",
412                &[],
413            )
414            .await?;
415
416        let infos = rows
417            .into_iter()
418            .map(|row| {
419                let name = row
420                    .get_by_index(0)
421                    .and_then(|v| {
422                        if let Value::Text(s) = v {
423                            Some(s.clone())
424                        } else {
425                            None
426                        }
427                    })
428                    .unwrap_or_default();
429                let ttype_str = row
430                    .get_by_index(1)
431                    .and_then(|v| {
432                        if let Value::Text(s) = v {
433                            Some(s.as_str())
434                        } else {
435                            None
436                        }
437                    })
438                    .unwrap_or("table");
439                let table_type = match ttype_str {
440                    "view" => TableType::View,
441                    _ => TableType::Base,
442                };
443                TableInfo {
444                    name,
445                    schema: None,
446                    table_type,
447                }
448            })
449            .collect();
450        Ok(infos)
451    }
452
453    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
454        // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
455        let sql = format!("PRAGMA table_info(\"{table}\")");
456        let rows = self.query(&sql, &[]).await?;
457
458        let infos = rows
459            .into_iter()
460            .map(|row| {
461                // Helper: get column by index as string or empty string.
462                let text_at = |r: &Row, idx: usize| -> String {
463                    r.get_by_index(idx)
464                        .and_then(|v| match v {
465                            Value::Text(s) => Some(s.clone()),
466                            Value::I64(n) => Some(n.to_string()),
467                            Value::Null => Some(String::new()),
468                            _ => None,
469                        })
470                        .unwrap_or_default()
471                };
472                let i64_at = |r: &Row, idx: usize| -> i64 {
473                    r.get_by_index(idx)
474                        .and_then(|v| {
475                            if let Value::I64(n) = v {
476                                Some(*n)
477                            } else {
478                                None
479                            }
480                        })
481                        .unwrap_or(0)
482                };
483
484                let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
485                let name = text_at(&row, 1);
486                let data_type = text_at(&row, 2);
487                let notnull = i64_at(&row, 3) != 0;
488                let default_val = row.get_by_index(4).and_then(|v| match v {
489                    Value::Text(s) => Some(s.clone()),
490                    Value::Null => None,
491                    other => Some(format!("{other:?}")),
492                });
493
494                ColumnInfo {
495                    name,
496                    ordinal_position: ordinal,
497                    data_type,
498                    nullable: !notnull,
499                    default: default_val,
500                    max_length: None,
501                    numeric_precision: None,
502                    numeric_scale: None,
503                }
504            })
505            .collect();
506        Ok(infos)
507    }
508
509    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
510        // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
511        // Fall back to sqlite_master for index names and uniqueness, then parse
512        // the index SQL to extract column names.  This is best-effort: multi-column
513        // indexes and expression indexes may not parse perfectly.
514        let sql = "SELECT name, sql FROM sqlite_master \
515                   WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
516        let rows = self.query(sql, &[&table]).await?;
517
518        let mut infos: Vec<IndexInfo> = Vec::new();
519        for row in rows {
520            let name = row
521                .get_by_index(0)
522                .and_then(|v| {
523                    if let Value::Text(s) = v {
524                        Some(s.clone())
525                    } else {
526                        None
527                    }
528                })
529                .unwrap_or_default();
530            let idx_sql = row
531                .get_by_index(1)
532                .and_then(|v| {
533                    if let Value::Text(s) = v {
534                        Some(s.clone())
535                    } else {
536                        None
537                    }
538                })
539                .unwrap_or_default();
540
541            // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
542            let upper = idx_sql.to_ascii_uppercase();
543            let unique = upper.contains("UNIQUE");
544
545            // Extract column list between the last `(` and `)`.
546            let columns: Vec<String> =
547                if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
548                    idx_sql[open + 1..close]
549                        .split(',')
550                        .map(|c| c.trim().to_string())
551                        .filter(|c| !c.is_empty())
552                        .collect()
553                } else {
554                    vec![]
555                };
556
557            infos.push(IndexInfo {
558                name,
559                columns,
560                unique,
561                primary: false,
562            });
563        }
564        Ok(infos)
565    }
566
567    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
568        // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
569        // directly from the in-memory schema, avoiding brittle DDL text parsing.
570        let escaped = table.replace('"', "\"\"");
571        let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
572        let rows = query_rewritten(&self.conn, &sql, vec![])
573            .await
574            .map_err(OxiSqlError::from)?;
575
576        // PRAGMA foreign_key_list columns (by index):
577        //  0: id INTEGER   — FK index within the table
578        //  1: seq INTEGER  — column position within a composite FK
579        //  2: table TEXT   — parent table name
580        //  3: from TEXT    — child column name
581        //  4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
582        //  5: on_update TEXT
583        //  6: on_delete TEXT
584        //  7: match TEXT
585        let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
586        for row in &rows {
587            let id = match row.get_by_index(0) {
588                Some(Value::I64(v)) => *v,
589                _ => 0,
590            };
591            let from_col = match row.get_by_index(3) {
592                Some(Value::Text(s)) => s.clone(),
593                _ => continue,
594            };
595            let foreign_table = match row.get_by_index(2) {
596                Some(Value::Text(s)) => s.clone(),
597                _ => continue,
598            };
599            let foreign_column = match row.get_by_index(4) {
600                Some(Value::Text(s)) => s.clone(),
601                _ => String::new(),
602            };
603            let on_update = match row.get_by_index(5) {
604                Some(Value::Text(s)) => Some(s.clone()),
605                _ => None,
606            };
607            let on_delete = match row.get_by_index(6) {
608                Some(Value::Text(s)) => Some(s.clone()),
609                _ => None,
610            };
611            let constraint_name = format!("fk_{table}_{id}");
612            infos.push(ForeignKeyInfo {
613                constraint_name,
614                column: from_col,
615                foreign_table,
616                foreign_column,
617                on_update,
618                on_delete,
619            });
620        }
621        Ok(infos)
622    }
623}
624
625// ── SqliteTransaction ─────────────────────────────────────────────────────────
626
627/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
628///
629/// Holds a guard on the connection-level transaction mutex so that no other
630/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
631/// When dropped without an explicit `commit` or `rollback`, the transaction
632/// attempts a best-effort `ROLLBACK` via a background task.
633pub struct SqliteTransaction<'a> {
634    conn: limbo::Connection,
635    stmt_cache: StmtCache,
636    _guard: tokio::sync::MutexGuard<'a, ()>,
637    done: bool,
638}
639
640impl<'a> Drop for SqliteTransaction<'a> {
641    fn drop(&mut self) {
642        if !self.done {
643            // Best-effort rollback on implicit drop.  We cannot `.await` inside
644            // `drop`, so we spawn a fire-and-forget task.  The mutex guard is
645            // released when `SqliteTransaction` is fully dropped (after this
646            // function body returns).
647            let conn = self.conn.clone();
648            tokio::spawn(async move {
649                if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
650                    log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
651                }
652            });
653        }
654    }
655}
656
657#[async_trait]
658impl<'a> Transaction for SqliteTransaction<'a> {
659    async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
660        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
661        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
662            .await
663            .map_err(OxiSqlError::from)
664    }
665
666    async fn query(
667        &mut self,
668        sql: &str,
669        params: &[&dyn ToSqlValue],
670    ) -> Result<Vec<Row>, OxiSqlError> {
671        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
672        query_rewritten(&self.conn, &rewritten, limbo_params)
673            .await
674            .map_err(OxiSqlError::from)
675    }
676
677    async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
678        self.done = true;
679        self.conn
680            .execute("COMMIT", LimboParams::None)
681            .await
682            .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
683        Ok(())
684    }
685
686    async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
687        // Mark done so that Drop does not attempt a second ROLLBACK.
688        self.done = true;
689        self.conn
690            .execute("ROLLBACK", LimboParams::None)
691            .await
692            .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
693        Ok(())
694    }
695}
696
697// ── SqlitePrepared ────────────────────────────────────────────────────────────
698
699/// A prepared statement backed by the connection-level LRU cache.
700///
701/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
702/// compiled fresh on a miss), executed, and returned to the cache.  Because
703/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
704/// change count without re-parsing the SQL.
705pub struct SqlitePrepared<'a> {
706    conn: &'a limbo::Connection,
707    stmt_cache: StmtCache,
708    sql: String,
709}
710
711#[async_trait]
712impl<'a> PreparedStatement for SqlitePrepared<'a> {
713    async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
714        let (rewritten, limbo_params) =
715            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
716        exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
717            .await
718            .map_err(OxiSqlError::from)
719    }
720
721    async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
722        let (rewritten, limbo_params) =
723            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
724        query_rewritten(self.conn, &rewritten, limbo_params)
725            .await
726            .map_err(OxiSqlError::from)
727    }
728
729    fn sql(&self) -> &str {
730        &self.sql
731    }
732}