oxisql-sqlite-compat 0.3.1

Pure-Rust SQLite-compatible backend for OxiSQL via the oxisqlite Pure-Rust engine (C-free fork of limbo)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
//!
//! # Concurrency model
//!
//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
//! `SqliteConnection` is a thin newtype that holds:
//!
//! - `conn: limbo::Connection` — the Limbo connection handle.
//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
//!   from issuing `BEGIN` concurrently on the same logical connection.  SQLite does
//!   not support nested transactions, so only one task at a time may hold a
//!   transaction.
//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
//!   diagnostics.
//!
//! # Affected-row count
//!
//! After each DML statement we call `conn.changes()` to read the row count that
//! was committed by the most-recent write transaction.  DDL statements and
//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
//! contract per OxiSQL and `sqlite3_changes()` semantics.
//!
//! # Parameter binding
//!
//! OxiSQL passes `$1`, `$2`, … positional parameters.  SQLite / Limbo expects
//! `?` placeholders.  `types::rewrite_params` performs a quote-aware
//! translation before each statement is prepared.
//!
//! # Schema introspection
//!
//! [`Connection::tables`] queries `sqlite_master`.
//! [`Connection::columns`] uses `PRAGMA table_info`.
//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
//! yet implemented in Limbo 0.0.22).
//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
//! surfaces FK metadata from its in-memory schema.
//!
//! # Transactions
//!
//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
//! that wraps the same `limbo::Connection`.  The transaction holds a guard on
//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
//! execute `ROLLBACK` (best-effort, via `Drop`).
//!
//! # Prepared-statement cache
//!
//! All DML and DDL statements pass through an LRU cache keyed by the
//! **rewritten SQL** (after `$N`→`?` translation).  The cache holds up to
//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
//!
//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
//! executed via `Statement::execute()` (which calls `reset()` before binding),
//! and returned to the cache after execution.  `Statement::reset()` now also
//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
//! reuse produces correct per-execution change counts.
//!
//! # ROLLBACK
//!
//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`.  The engine
//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
//! that discards all pending changes.  The `Drop` impl also fires a best-effort
//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
//! `rollback()`.
//!
//! # Prepared-statement reuse (via SqlitePrepared)
//!
//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call.  The
//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
//! even though Limbo does not yet expose a stable compiled-statement cache.

use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex as StdMutex};

use async_trait::async_trait;
use limbo::params::Params as LimboParams;
use limbo::Builder;
use tokio::sync::Mutex as TokioMutex;

// ── statement-cache capacity ───────────────────────────────────────────────────

/// Maximum number of compiled statements retained in the per-connection LRU
/// cache.  Statements are keyed by their rewritten SQL (`?`-placeholder form).
const STMT_CACHE_CAPACITY: usize = 128;

use oxisql_core::{
    ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
    TableInfo, TableType, ToSqlValue, Transaction, Value,
};

use crate::error::SqliteCompatError;
use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};

// ── helpers ───────────────────────────────────────────────────────────────────

/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
///
/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
/// `SqliteConnection` is cloned.  The std `Mutex` is deliberately chosen over
/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
/// or insertion) and never held across an `.await` point.
type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;

/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
fn new_stmt_cache() -> StmtCache {
    // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
    //         `NonZeroUsize::new` returns `None` only for 0, which this is not.
    let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
    Arc::new(StdMutex::new(lru::LruCache::new(cap)))
}

/// Execute a SQL statement that has already been rewritten to `?` placeholders.
///
/// All statements (DML and DDL) pass through the statement cache uniformly.
/// On a cache miss the statement is compiled via `conn.prepare()`, executed,
/// and stored for future reuse.  On a cache hit the existing `limbo::Statement`
/// is retrieved, executed via `stmt.execute()` (which calls `reset()` before
/// binding, zeroing `n_change` so reuse produces correct per-execution change
/// counts), and returned to the cache.
///
/// If the cached statement was compiled before a schema change (DDL, ALTER,
/// CREATE INDEX, etc.), the engine's `op_transaction` cookie check fires on
/// the first `step()` and returns `SchemaChanged`.  This function catches that
/// error, discards the stale compiled program, re-prepares against the
/// refreshed schema, and retries exactly once.  This transparent re-prepare
/// replaces the old `is_ddl` keyword-prefix heuristic that failed on
/// comment-prefixed DDL and left DML statements stale after schema changes.
///
/// The affected-row count is read from `conn.changes()` after execution,
/// which reflects the count committed by the most recent write transaction on
/// this connection.  DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
/// correct value per the OxiSQL contract.
///
/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
/// function falls back to `conn.execute()` followed by `conn.changes()`.
async fn exec_rewritten(
    conn: &limbo::Connection,
    sql: &str,
    limbo_params: Vec<limbo::Value>,
    cache: Option<&StmtCache>,
) -> Result<u64, SqliteCompatError> {
    match cache {
        Some(c) => {
            // Clone before consuming so we can rebuild the parameter list for a
            // re-prepare-and-retry if the engine signals SchemaChanged.
            let retry_params = limbo_params.clone();
            let lp = if limbo_params.is_empty() {
                LimboParams::None
            } else {
                LimboParams::Positional(limbo_params)
            };

            // Take the compiled statement out of the cache (if present).
            // The lock is held only for this short lookup; never across `.await`.
            let cached = {
                let mut guard = c.lock().map_err(|e| {
                    SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
                })?;
                guard.pop(sql)
            };

            let mut stmt = match cached {
                Some(s) => s,
                None => conn.prepare(sql).await.map_err(SqliteCompatError::from)?,
            };

            match stmt.execute(lp).await {
                Ok(_) => {
                    // Execution succeeded — return the statement to the cache.
                    c.lock()
                        .map_err(|e| {
                            SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
                        })?
                        .put(sql.to_owned(), stmt);
                }
                Err(e) if e.is_schema_changed() => {
                    // The schema changed after this statement was compiled. Drop
                    // the stale program, re-compile against the refreshed schema,
                    // and retry exactly once.
                    drop(stmt);
                    let retry_lp = if retry_params.is_empty() {
                        LimboParams::None
                    } else {
                        LimboParams::Positional(retry_params)
                    };
                    let mut fresh = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
                    fresh
                        .execute(retry_lp)
                        .await
                        .map_err(SqliteCompatError::from)?;
                    c.lock()
                        .map_err(|e| {
                            SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
                        })?
                        .put(sql.to_owned(), fresh);
                }
                Err(e) => return Err(SqliteCompatError::from(e)),
            }

            let n = conn
                .changes()
                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
            Ok(n.max(0) as u64)
        }
        None => {
            // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
            let lp = if limbo_params.is_empty() {
                LimboParams::None
            } else {
                LimboParams::Positional(limbo_params)
            };
            conn.execute(sql, lp)
                .await
                .map_err(SqliteCompatError::from)?;
            let n = conn
                .changes()
                .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
            Ok(n.max(0) as u64)
        }
    }
}

/// Execute a query that has already been rewritten to `?` placeholders and
/// collect all result rows.
///
/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
/// so that richer [`Value`] variants are produced when appropriate.
async fn query_rewritten(
    conn: &limbo::Connection,
    sql: &str,
    limbo_params: Vec<limbo::Value>,
) -> Result<Vec<Row>, SqliteCompatError> {
    let lp = if limbo_params.is_empty() {
        LimboParams::None
    } else {
        LimboParams::Positional(limbo_params)
    };

    let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;

    // Collect column names and declared types together.
    let col_info: Vec<(String, Option<String>)> = stmt
        .columns()
        .iter()
        .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
        .collect();

    let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();

    let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;

    let mut rows: Vec<Row> = Vec::new();
    while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
        let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
        for idx in 0..limbo_row.column_count() {
            let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
            let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
            values.push(limbo_to_core_typed(raw, decl)?);
        }
        rows.push(Row::new(col_names.clone(), values));
    }
    Ok(rows)
}

// ── SqliteConnection ──────────────────────────────────────────────────────────

/// A Limbo-backed SQLite connection implementing [`Connection`].
///
/// Create via [`SqliteConnection::open`] (file path) or
/// [`SqliteConnection::open_memory`] (`:memory:`).
///
/// # Statement cache
///
/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
/// objects (capacity: `STMT_CACHE_CAPACITY` = 128).  The cache is shared across
/// clones of the same connection (the clones share the underlying
/// `limbo::Connection`) and is updated on every DML/DDL execution.  Cache hits
/// save the per-statement parse-and-compile round-trip inside Limbo.
#[derive(Clone)]
pub struct SqliteConnection {
    conn: limbo::Connection,
    txn_lock: Arc<TokioMutex<()>>,
    stmt_cache: StmtCache,
    path: String,
}

impl std::fmt::Debug for SqliteConnection {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
        f.debug_struct("SqliteConnection")
            .field("path", &self.path)
            .field("stmt_cache_len", &cache_len)
            .finish_non_exhaustive()
    }
}

impl SqliteConnection {
    /// Open a Limbo database at the given file path.
    ///
    /// Pass `":memory:"` for an in-memory database, or use
    /// [`open_memory`][Self::open_memory] for clarity.
    ///
    /// # Errors
    ///
    /// Returns [`OxiSqlError`] if the file cannot be opened or created.
    pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
        let db = Builder::new_local(path)
            .build()
            .await
            .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
        let conn = db
            .connect()
            .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
        Ok(Self {
            conn,
            txn_lock: Arc::new(TokioMutex::new(())),
            stmt_cache: new_stmt_cache(),
            path: path.to_owned(),
        })
    }

    /// Open a fresh in-memory Limbo database.
    ///
    /// # Errors
    ///
    /// Returns [`OxiSqlError`] if the engine cannot be initialised.
    pub async fn open_memory() -> Result<Self, OxiSqlError> {
        Self::open(":memory:").await
    }

    /// Return the path this connection was opened with.
    pub fn path(&self) -> &str {
        &self.path
    }
}

// ── Connection impl ───────────────────────────────────────────────────────────

#[async_trait]
impl Connection for SqliteConnection {
    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
            .await
            .map_err(OxiSqlError::from)
    }

    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
        query_rewritten(&self.conn, &rewritten, limbo_params)
            .await
            .map_err(OxiSqlError::from)
    }

    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
        // Acquire the exclusive transaction lock before issuing BEGIN.
        // This prevents a second task from starting a concurrent transaction
        // on the same SqliteConnection clone.
        let guard = self.txn_lock.lock().await;
        self.conn
            .execute("BEGIN", LimboParams::None)
            .await
            .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
        Ok(Box::new(SqliteTransaction {
            conn: self.conn.clone(),
            // Share the connection-level stmt_cache so that DML executed inside
            // a transaction also benefits from cached compiled statements.
            stmt_cache: Arc::clone(&self.stmt_cache),
            // Transfer ownership of the mutex guard into the transaction.
            // The guard is released when SqliteTransaction is dropped.
            _guard: guard,
            done: false,
        }))
    }

    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
        // Token-aware split: honours `;` inside string literals, quoted
        // identifiers, block comments, and line comments.
        let stmts = split_statements(sql);
        let mut total = 0u64;
        for stmt in stmts {
            total += self.execute(stmt, &[]).await?;
        }
        Ok(total)
    }

    async fn ping(&self) -> Result<(), OxiSqlError> {
        self.query("SELECT 1", &[]).await?;
        Ok(())
    }

    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
        Ok(Box::new(SqlitePrepared {
            conn: &self.conn,
            stmt_cache: Arc::clone(&self.stmt_cache),
            sql: sql.to_owned(),
        }))
    }

    // ── Schema introspection ──────────────────────────────────────────────────

    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
        let rows = self
            .query(
                "SELECT name, type FROM sqlite_master \
                 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
                 ORDER BY name",
                &[],
            )
            .await?;

        let infos = rows
            .into_iter()
            .map(|row| {
                let name = row
                    .get_by_index(0)
                    .and_then(|v| {
                        if let Value::Text(s) = v {
                            Some(s.clone())
                        } else {
                            None
                        }
                    })
                    .unwrap_or_default();
                let ttype_str = row
                    .get_by_index(1)
                    .and_then(|v| {
                        if let Value::Text(s) = v {
                            Some(s.as_str())
                        } else {
                            None
                        }
                    })
                    .unwrap_or("table");
                let table_type = match ttype_str {
                    "view" => TableType::View,
                    _ => TableType::Base,
                };
                TableInfo {
                    name,
                    schema: None,
                    table_type,
                }
            })
            .collect();
        Ok(infos)
    }

    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
        // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
        let sql = format!("PRAGMA table_info(\"{table}\")");
        let rows = self.query(&sql, &[]).await?;

        let infos = rows
            .into_iter()
            .map(|row| {
                // Helper: get column by index as string or empty string.
                let text_at = |r: &Row, idx: usize| -> String {
                    r.get_by_index(idx)
                        .and_then(|v| match v {
                            Value::Text(s) => Some(s.clone()),
                            Value::I64(n) => Some(n.to_string()),
                            Value::Null => Some(String::new()),
                            _ => None,
                        })
                        .unwrap_or_default()
                };
                let i64_at = |r: &Row, idx: usize| -> i64 {
                    r.get_by_index(idx)
                        .and_then(|v| {
                            if let Value::I64(n) = v {
                                Some(*n)
                            } else {
                                None
                            }
                        })
                        .unwrap_or(0)
                };

                let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
                let name = text_at(&row, 1);
                let data_type = text_at(&row, 2);
                let notnull = i64_at(&row, 3) != 0;
                let default_val = row.get_by_index(4).and_then(|v| match v {
                    Value::Text(s) => Some(s.clone()),
                    Value::Null => None,
                    other => Some(format!("{other:?}")),
                });

                ColumnInfo {
                    name,
                    ordinal_position: ordinal,
                    data_type,
                    nullable: !notnull,
                    default: default_val,
                    max_length: None,
                    numeric_precision: None,
                    numeric_scale: None,
                }
            })
            .collect();
        Ok(infos)
    }

    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
        // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
        // Fall back to sqlite_master for index names and uniqueness, then parse
        // the index SQL to extract column names.  This is best-effort: multi-column
        // indexes and expression indexes may not parse perfectly.
        let sql = "SELECT name, sql FROM sqlite_master \
                   WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
        let rows = self.query(sql, &[&table]).await?;

        let mut infos: Vec<IndexInfo> = Vec::new();
        for row in rows {
            let name = row
                .get_by_index(0)
                .and_then(|v| {
                    if let Value::Text(s) = v {
                        Some(s.clone())
                    } else {
                        None
                    }
                })
                .unwrap_or_default();
            let idx_sql = row
                .get_by_index(1)
                .and_then(|v| {
                    if let Value::Text(s) = v {
                        Some(s.clone())
                    } else {
                        None
                    }
                })
                .unwrap_or_default();

            // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
            let upper = idx_sql.to_ascii_uppercase();
            let unique = upper.contains("UNIQUE");

            // Extract column list between the last `(` and `)`.
            let columns: Vec<String> =
                if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
                    idx_sql[open + 1..close]
                        .split(',')
                        .map(|c| c.trim().to_string())
                        .filter(|c| !c.is_empty())
                        .collect()
                } else {
                    vec![]
                };

            infos.push(IndexInfo {
                name,
                columns,
                unique,
                primary: false,
            });
        }
        Ok(infos)
    }

    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
        // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
        // directly from the in-memory schema, avoiding brittle DDL text parsing.
        let escaped = table.replace('"', "\"\"");
        let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
        let rows = query_rewritten(&self.conn, &sql, vec![])
            .await
            .map_err(OxiSqlError::from)?;

        // PRAGMA foreign_key_list columns (by index):
        //  0: id INTEGER   — FK index within the table
        //  1: seq INTEGER  — column position within a composite FK
        //  2: table TEXT   — parent table name
        //  3: from TEXT    — child column name
        //  4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
        //  5: on_update TEXT
        //  6: on_delete TEXT
        //  7: match TEXT
        let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
        for row in &rows {
            let id = match row.get_by_index(0) {
                Some(Value::I64(v)) => *v,
                _ => 0,
            };
            let from_col = match row.get_by_index(3) {
                Some(Value::Text(s)) => s.clone(),
                _ => continue,
            };
            let foreign_table = match row.get_by_index(2) {
                Some(Value::Text(s)) => s.clone(),
                _ => continue,
            };
            let foreign_column = match row.get_by_index(4) {
                Some(Value::Text(s)) => s.clone(),
                _ => String::new(),
            };
            let on_update = match row.get_by_index(5) {
                Some(Value::Text(s)) => Some(s.clone()),
                _ => None,
            };
            let on_delete = match row.get_by_index(6) {
                Some(Value::Text(s)) => Some(s.clone()),
                _ => None,
            };
            let constraint_name = format!("fk_{table}_{id}");
            infos.push(ForeignKeyInfo {
                constraint_name,
                column: from_col,
                foreign_table,
                foreign_column,
                on_update,
                on_delete,
            });
        }
        Ok(infos)
    }
}

// ── SqliteTransaction ─────────────────────────────────────────────────────────

/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
///
/// Holds a guard on the connection-level transaction mutex so that no other
/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
/// When dropped without an explicit `commit` or `rollback`, the transaction
/// attempts a best-effort `ROLLBACK` via a background task.
pub struct SqliteTransaction<'a> {
    conn: limbo::Connection,
    stmt_cache: StmtCache,
    _guard: tokio::sync::MutexGuard<'a, ()>,
    done: bool,
}

impl<'a> Drop for SqliteTransaction<'a> {
    fn drop(&mut self) {
        if !self.done {
            // Best-effort rollback on implicit drop.  We cannot `.await` inside
            // `drop`, so we spawn a fire-and-forget task.  The mutex guard is
            // released when `SqliteTransaction` is fully dropped (after this
            // function body returns).
            let conn = self.conn.clone();
            tokio::spawn(async move {
                if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
                    log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
                }
            });
        }
    }
}

#[async_trait]
impl<'a> Transaction for SqliteTransaction<'a> {
    async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
        exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
            .await
            .map_err(OxiSqlError::from)
    }

    async fn query(
        &mut self,
        sql: &str,
        params: &[&dyn ToSqlValue],
    ) -> Result<Vec<Row>, OxiSqlError> {
        let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
        query_rewritten(&self.conn, &rewritten, limbo_params)
            .await
            .map_err(OxiSqlError::from)
    }

    async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
        self.done = true;
        self.conn
            .execute("COMMIT", LimboParams::None)
            .await
            .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
        Ok(())
    }

    async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
        // Mark done so that Drop does not attempt a second ROLLBACK.
        self.done = true;
        self.conn
            .execute("ROLLBACK", LimboParams::None)
            .await
            .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
        Ok(())
    }
}

// ── SqlitePrepared ────────────────────────────────────────────────────────────

/// A prepared statement backed by the connection-level LRU cache.
///
/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
/// compiled fresh on a miss), executed, and returned to the cache.  Because
/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
/// change count without re-parsing the SQL.
pub struct SqlitePrepared<'a> {
    conn: &'a limbo::Connection,
    stmt_cache: StmtCache,
    sql: String,
}

#[async_trait]
impl<'a> PreparedStatement for SqlitePrepared<'a> {
    async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
        let (rewritten, limbo_params) =
            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
        exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
            .await
            .map_err(OxiSqlError::from)
    }

    async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
        let (rewritten, limbo_params) =
            rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
        query_rewritten(self.conn, &rewritten, limbo_params)
            .await
            .map_err(OxiSqlError::from)
    }

    fn sql(&self) -> &str {
        &self.sql
    }
}