Skip to main content

sqlrite/
connection.rs

1//! Public `Connection` / `Statement` / `Rows` / `Row` API (Phase 5a + SQLR-23).
2//!
3//! This is the stable surface external consumers bind against — Rust
4//! callers use it directly, language SDKs (Python, Node.js, Go) bind
5//! against the C FFI wrapper over these same types in Phase 5b, and
6//! the WASM build in Phase 5g re-exposes them via `wasm-bindgen`.
7//!
8//! The shape mirrors `rusqlite` / Python's `sqlite3` so users
9//! familiar with either can pick it up immediately:
10//!
11//! ```no_run
12//! use sqlrite::Connection;
13//!
14//! let mut conn = Connection::open("foo.sqlrite")?;
15//! conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")?;
16//! conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
17//!
18//! let mut stmt = conn.prepare("SELECT id, name FROM users")?;
19//! let mut rows = stmt.query()?;
20//! while let Some(row) = rows.next()? {
21//!     let id: i64 = row.get(0)?;
22//!     let name: String = row.get(1)?;
23//!     println!("{id}: {name}");
24//! }
25//! # Ok::<(), sqlrite::SQLRiteError>(())
26//! ```
27//!
28//! **Relationship to the internal engine.** A `Connection` owns a
29//! `Database` (which owns a `Pager` for file-backed connections).
30//! `execute` and `query` go through the same `process_command`
31//! pipeline the REPL uses, just with typed row return instead of
32//! pre-rendered tables. The internal `Database` / `Pager` stay
33//! accessible via `sqlrite::sql::...` for the engine's own tests
34//! and for the desktop app — but those paths aren't considered
35//! stable API.
36//!
37//! # Prepared statements & parameter binding (SQLR-23)
38//!
39//! `Connection::prepare` parses the SQL once and stashes the AST on
40//! the returned `Statement`. Subsequent calls to `Statement::query` /
41//! `Statement::run` execute against the cached AST without re-running
42//! sqlparser. Bound versions ([`Statement::query_with_params`] /
43//! [`Statement::execute_with_params`]) accept a `&[Value]` slice that is
44//! substituted into the cached AST at execute time — including
45//! `Value::Vector(...)` for HNSW-eligible KNN queries, where binding
46//! the query vector skips per-iter lexing of the 4 KB bracket-array
47//! literal.
48//!
49//! [`Connection::prepare_cached`] adds a small per-connection LRU
50//! (default cap 16) so a hot SQL string is parsed exactly once across
51//! every call, not once per `prepare()`. Matches the rusqlite pattern.
52
53use std::collections::{HashMap, VecDeque};
54use std::path::Path;
55use std::sync::{Arc, Mutex, MutexGuard};
56
57use crate::sql::dialect::SqlriteDialect;
58use sqlparser::ast::Statement as AstStatement;
59use sqlparser::parser::Parser;
60
61use crate::error::{Result, SQLRiteError};
62use crate::mvcc::{
63    ConcurrentTx, JournalMode, MvccCommitBatch, MvccLogRecord, RowID, RowVersion, VersionPayload,
64};
65use crate::sql::db::database::{Database, TxnSnapshot};
66use crate::sql::db::table::{Table, Value};
67use crate::sql::executor::execute_select_rows;
68use crate::sql::pager::{self, AccessMode, open_database_with_mode, save_database};
69use crate::sql::params::{rewrite_placeholders, substitute_params};
70use crate::sql::parser::select::SelectQuery;
71use crate::sql::process_ast_with_render;
72
73/// Default capacity of the per-connection prepared-statement plan cache.
74/// Matches rusqlite's default; tweak with [`Connection::set_prepared_cache_capacity`].
75const DEFAULT_PREP_CACHE_CAP: usize = 16;
76
77/// A handle to a SQLRite database. Opens a file or an in-memory DB;
78/// drop it to close. Every mutating statement auto-saves (except inside
79/// an explicit `BEGIN`/`COMMIT` block — see [Transactions](#transactions)).
80///
81/// ## Transactions
82///
83/// ```no_run
84/// # use sqlrite::Connection;
85/// let mut conn = Connection::open("foo.sqlrite")?;
86/// conn.execute("BEGIN")?;
87/// conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
88/// conn.execute("INSERT INTO users (name) VALUES ('bob')")?;
89/// conn.execute("COMMIT")?;
90/// # Ok::<(), sqlrite::SQLRiteError>(())
91/// ```
92///
93/// ## Multiple connections (Phase 10.1)
94///
95/// `Connection` is a thin handle over an `Arc<Mutex<Database>>`. Call
96/// [`Connection::connect`] to mint a sibling handle that shares the
97/// same backing `Database` — typically one per worker thread. Today
98/// every operation still serializes through the single mutex (and the
99/// pager's exclusive flock between processes), so the headline
100/// behaviour change is that callers can hold and address the same DB
101/// from more than one thread without wrapping the whole `Connection`
102/// in a `Mutex` themselves. `BEGIN CONCURRENT` and snapshot-isolated
103/// reads land in subsequent Phase 10 sub-phases.
104///
105/// `Connection` is `Send + Sync`. The recommended pattern is one
106/// connection per thread (clone via `connect()`); statements still
107/// borrow `&mut Connection`, so a single connection isn't suitable
108/// for true concurrent statement execution.
109pub struct Connection {
110    /// Shared engine state. Mints sibling connections via
111    /// [`Connection::connect`] without copying the in-memory tables
112    /// or the long-lived pager.
113    inner: Arc<Mutex<Database>>,
114    /// SQLR-23 — small SQL→cached-plan LRU. Keyed by the verbatim SQL
115    /// string the caller passed to `prepare_cached`. Stored as a
116    /// `VecDeque` rather than a HashMap+linked-list because the
117    /// expected capacity is small (default 16) — linear scan is fine
118    /// and the implementation stays dependency-free.
119    ///
120    /// Per-connection (not shared with sibling handles) — each thread
121    /// gets its own LRU so cache-mutation never crosses a thread
122    /// boundary.
123    prep_cache: VecDeque<(String, Arc<CachedPlan>)>,
124    prep_cache_cap: usize,
125    /// Phase 11.4 — per-connection `BEGIN CONCURRENT` state.
126    /// `None` outside a concurrent transaction; `Some` between
127    /// `BEGIN CONCURRENT` and `COMMIT` / `ROLLBACK`. Multiple
128    /// sibling connections can each hold their own — that's the
129    /// headline concurrency story this slice unlocks.
130    ///
131    /// While `Some`, every statement on this connection runs
132    /// against the cloned tables in [`ConcurrentTx::tables`]
133    /// instead of the live `Database::tables`. The live database
134    /// stays untouched until the commit-validation pass succeeds.
135    ///
136    /// **Phase 11.5 — wrapped in a `Mutex`.** [`Statement::query`]
137    /// and [`Statement::query_with_params`] take `&self`, so they
138    /// need interior mutability to swap the snapshot in for the
139    /// read. The lock is uncontended in single-thread use (each
140    /// connection's `concurrent_tx` is per-handle, and the
141    /// Statement-borrows-Connection contract still serializes
142    /// statements on a given handle); the Mutex is the cheapest
143    /// way to satisfy the borrow checker without restructuring
144    /// the Statement API. Lock order is always
145    /// `concurrent_tx` → `inner` to keep deadlock-free.
146    concurrent_tx: Mutex<Option<ConcurrentTx>>,
147}
148
149impl Connection {
150    /// Opens (or creates) a database file for read-write access.
151    ///
152    /// If the file doesn't exist, an empty one is materialized with the
153    /// current format version. Takes an exclusive advisory lock on the
154    /// file and its `-wal` sidecar; returns `Err` if either is already
155    /// locked by another process.
156    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
157        let path = path.as_ref();
158        let db_name = path
159            .file_stem()
160            .and_then(|s| s.to_str())
161            .unwrap_or("db")
162            .to_string();
163        let db = if path.exists() {
164            open_database_with_mode(path, db_name, AccessMode::ReadWrite)?
165        } else {
166            // Fresh file: materialize on disk and keep the attached
167            // pager. Setting `source_path` before `save_database` lets
168            // its `same_path` branch create the pager and stash it
169            // back on the Database — no reopen needed (and trying to
170            // reopen here would hit the file's own lock).
171            let mut fresh = Database::new(db_name);
172            fresh.source_path = Some(path.to_path_buf());
173            save_database(&mut fresh, path)?;
174            fresh
175        };
176        Ok(Self::wrap(db))
177    }
178
179    /// Opens an existing database file for read-only access. Takes a
180    /// shared advisory lock, so multiple read-only connections can
181    /// coexist on the same file; any open writer excludes them.
182    /// Mutating statements return `cannot execute: database is opened
183    /// read-only`.
184    pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
185        let path = path.as_ref();
186        let db_name = path
187            .file_stem()
188            .and_then(|s| s.to_str())
189            .unwrap_or("db")
190            .to_string();
191        let db = open_database_with_mode(path, db_name, AccessMode::ReadOnly)?;
192        Ok(Self::wrap(db))
193    }
194
195    /// Opens a transient in-memory database. No file is touched and no
196    /// locks are taken; state lives for the lifetime of the
197    /// `Connection` and is discarded on drop.
198    pub fn open_in_memory() -> Result<Self> {
199        Ok(Self::wrap(Database::new("memdb".to_string())))
200    }
201
202    fn wrap(db: Database) -> Self {
203        Self {
204            inner: Arc::new(Mutex::new(db)),
205            prep_cache: VecDeque::new(),
206            prep_cache_cap: DEFAULT_PREP_CACHE_CAP,
207            concurrent_tx: Mutex::new(None),
208        }
209    }
210
211    /// Phase 10.1 — mints another `Connection` sharing the same
212    /// backing `Database`. Hand the returned handle to a separate
213    /// thread to address the same in-memory tables and persistent
214    /// pager from there.
215    ///
216    /// The new handle starts with an empty prepared-statement cache
217    /// (caches are per-handle, by design). Inherits the parent's
218    /// `prepare_cached` capacity. Concurrent operations still
219    /// serialize through the engine's internal lock and the pager's
220    /// existing single-writer rule — a true multi-writer story
221    /// arrives with `BEGIN CONCURRENT` in Phase 10.4.
222    ///
223    /// ```no_run
224    /// # use sqlrite::Connection;
225    /// let mut primary = Connection::open("foo.sqlrite")?;
226    /// let secondary = primary.connect();
227    /// std::thread::spawn(move || {
228    ///     let mut conn = secondary;
229    ///     conn.execute("INSERT INTO t (x) VALUES (1)").unwrap();
230    /// })
231    /// .join()
232    /// .unwrap();
233    /// # Ok::<(), sqlrite::SQLRiteError>(())
234    /// ```
235    pub fn connect(&self) -> Self {
236        Self {
237            inner: Arc::clone(&self.inner),
238            prep_cache: VecDeque::new(),
239            prep_cache_cap: self.prep_cache_cap,
240            // Phase 11.4: each sibling handle starts outside any
241            // concurrent transaction. Multi-thread `BEGIN CONCURRENT`
242            // is the headline use case — every clone gets its own
243            // independent slot.
244            concurrent_tx: Mutex::new(None),
245        }
246    }
247
248    /// Phase 10.1 — number of `Connection` handles currently sharing
249    /// this database (this handle plus every live `connect()`
250    /// descendant). Useful for diagnostics and tests; no semantic
251    /// guarantee beyond that.
252    pub fn handle_count(&self) -> usize {
253        Arc::strong_count(&self.inner)
254    }
255
256    /// Locks the shared `Database` and returns the guard. Internal
257    /// helper — every public method that needs `&mut Database` calls
258    /// this. The lock is released when the guard drops, so callers
259    /// must keep the guard alive for the duration of the engine call
260    /// (typically by binding it to a local).
261    fn lock(&self) -> MutexGuard<'_, Database> {
262        // `unwrap` propagates a panic from another thread that held
263        // the lock — there's no engine-level recovery story for a
264        // poisoned `Database` (the in-memory tables would be in an
265        // unknown state), so failing fast is the right behaviour.
266        self.inner
267            .lock()
268            .unwrap_or_else(|e| panic!("sqlrite: database mutex poisoned: {e}"))
269    }
270
271    /// Parses and executes one SQL statement. For DDL (`CREATE TABLE`,
272    /// `CREATE INDEX`), DML (`INSERT`, `UPDATE`, `DELETE`) and
273    /// transaction control (`BEGIN`, `COMMIT`, `ROLLBACK`,
274    /// `BEGIN CONCURRENT`). Returns the status message the engine
275    /// produced (e.g. `"INSERT Statement executed."`).
276    ///
277    /// For `SELECT`, `execute` works but discards the row data and
278    /// just returns the rendered status — use [`Connection::prepare`]
279    /// and [`Statement::query`] to iterate typed rows.
280    ///
281    /// Phase 11.4 — intercepts `BEGIN CONCURRENT`, `COMMIT`, and
282    /// `ROLLBACK` before sqlparser sees them so the per-connection
283    /// MVCC transaction state stays in sync. Inside an open
284    /// concurrent transaction, every other statement runs against
285    /// the transaction's private cloned tables; the live database
286    /// stays untouched until commit-validation succeeds.
287    pub fn execute(&mut self, sql: &str) -> Result<String> {
288        let intent = concurrent_tx_intent(sql);
289        let has_tx = self.concurrent_tx_is_open();
290        match intent {
291            ConcurrentTxIntent::Begin => self.begin_concurrent(),
292            ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent(),
293            ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent(),
294            ConcurrentTxIntent::None
295            | ConcurrentTxIntent::Commit
296            | ConcurrentTxIntent::Rollback => self.execute_dispatch(sql),
297        }
298    }
299
300    /// Phase 11.11a — same as [`Connection::execute`], but returns
301    /// the full [`CommandOutput`] (status + optional pre-rendered
302    /// prettytable for `SELECT`). The REPL needs this to print the
303    /// table the engine produced *and* the status line in one
304    /// pass, while still routing `BEGIN CONCURRENT` / `COMMIT` /
305    /// `ROLLBACK` through the per-connection MVCC state.
306    ///
307    /// `BEGIN` / `COMMIT` / `ROLLBACK` carry no rendered output —
308    /// they return `CommandOutput { status, rendered: None }`.
309    pub fn execute_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
310        let intent = concurrent_tx_intent(sql);
311        let has_tx = self.concurrent_tx_is_open();
312        let status = match intent {
313            ConcurrentTxIntent::Begin => self.begin_concurrent()?,
314            ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent()?,
315            ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent()?,
316            ConcurrentTxIntent::None
317            | ConcurrentTxIntent::Commit
318            | ConcurrentTxIntent::Rollback => return self.execute_dispatch_with_render(sql),
319        };
320        Ok(crate::sql::CommandOutput {
321            status,
322            rendered: None,
323        })
324    }
325
326    /// Phase 11.5 — cheap probe used by [`Connection::execute`]
327    /// (and [`Statement::query`]) to decide whether to route
328    /// through the concurrent-tx dispatch. Acquires the
329    /// `concurrent_tx` mutex briefly; never blocks for a
330    /// meaningful amount of time because the only other lockers
331    /// are this connection's own writers.
332    ///
333    /// Public so the REPL can render per-handle tx state in
334    /// `.conns` output (Phase 11.11a).
335    pub fn concurrent_tx_is_open(&self) -> bool {
336        self.lock_concurrent_tx().is_some()
337    }
338
339    /// Phase 11.5 — locks the per-connection
340    /// `Mutex<Option<ConcurrentTx>>`. Wrapping the poison handler
341    /// in one place keeps every caller's lock-order discipline
342    /// visible at the call site (always `concurrent_tx` before
343    /// `inner`).
344    fn lock_concurrent_tx(&self) -> MutexGuard<'_, Option<ConcurrentTx>> {
345        self.concurrent_tx.lock().unwrap_or_else(|e| {
346            panic!("sqlrite: concurrent_tx mutex poisoned: {e}");
347        })
348    }
349
350    /// Phase 11.5 — runs `f` against the read-side `&Database`
351    /// the caller's transaction expects to see.
352    ///
353    /// - **No concurrent transaction open** — `f` runs against the
354    ///   live `Database::tables`. Same path the legacy `query`
355    ///   used.
356    /// - **Concurrent transaction open** — swaps the transaction's
357    ///   private cloned `tables` in for the duration of `f`, so
358    ///   `f` sees the BEGIN-time snapshot plus any writes the
359    ///   transaction has staged. Swaps back before the function
360    ///   returns even on error (the swap-back uses a scope guard
361    ///   pattern so a panic inside `f` doesn't leave `db.tables`
362    ///   pointing at the snapshot clone).
363    ///
364    /// Takes `&self` (rather than `&mut self`) because the
365    /// `Statement::query` API contract is `&self` — that's why the
366    /// `concurrent_tx` field lives behind a `Mutex`. Lock order is
367    /// `concurrent_tx` → `inner`, matching every other tx-aware
368    /// path on this connection.
369    pub(crate) fn with_snapshot_read<F, R>(&self, f: F) -> R
370    where
371        F: FnOnce(&Database) -> R,
372    {
373        let mut tx_slot = self.lock_concurrent_tx();
374        let mut db = self.lock();
375        match tx_slot.as_mut() {
376            None => f(&db),
377            Some(tx) => {
378                // Swap the snapshot in. Use a scope guard so the
379                // unswap happens even if `f` unwinds — leaving
380                // `db.tables` pointing at the tx's private clone
381                // would be catastrophic for later sibling-handle
382                // reads.
383                std::mem::swap(&mut db.tables, &mut tx.tables);
384                let prior_txn = db.txn.take();
385                db.txn = Some(TxnSnapshot {
386                    tables: HashMap::new(),
387                });
388
389                struct UnswapGuard<'a> {
390                    db: &'a mut Database,
391                    tx_tables: &'a mut HashMap<String, Table>,
392                    prior_txn: Option<TxnSnapshot>,
393                    armed: bool,
394                }
395                impl Drop for UnswapGuard<'_> {
396                    fn drop(&mut self) {
397                        if self.armed {
398                            self.db.txn = self.prior_txn.take();
399                            std::mem::swap(&mut self.db.tables, self.tx_tables);
400                        }
401                    }
402                }
403                let mut guard = UnswapGuard {
404                    db: &mut db,
405                    tx_tables: &mut tx.tables,
406                    prior_txn,
407                    armed: true,
408                };
409
410                let result = f(guard.db);
411
412                // Disarm the guard explicitly and unwind in the
413                // expected order so the borrow checker can see
414                // both fields are accessed disjointly.
415                guard.armed = false;
416                guard.db.txn = guard.prior_txn.take();
417                std::mem::swap(&mut guard.db.tables, guard.tx_tables);
418
419                result
420            }
421        }
422    }
423
424    /// Internal — runs `sql` against the engine. If a concurrent
425    /// transaction is open, swaps the transaction's private
426    /// `tables` map in for the duration of the dispatch so writes
427    /// land on the snapshot, not the live database. Otherwise
428    /// falls straight through to the legacy
429    /// [`crate::sql::process_command`] path.
430    fn execute_dispatch(&mut self, sql: &str) -> Result<String> {
431        if self.concurrent_tx_is_open() {
432            self.execute_in_concurrent_tx(sql)
433        } else {
434            let mut db = self.lock();
435            crate::sql::process_command(sql, &mut db)
436        }
437    }
438
439    /// Phase 11.11a — render-aware twin of
440    /// [`Connection::execute_dispatch`]. Same branching, but the
441    /// non-concurrent path calls `process_command_with_render` and
442    /// the concurrent path goes through
443    /// [`Connection::execute_in_concurrent_tx_with_render`].
444    fn execute_dispatch_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
445        if self.concurrent_tx_is_open() {
446            self.execute_in_concurrent_tx_with_render(sql)
447        } else {
448            let mut db = self.lock();
449            crate::sql::process_command_with_render(sql, &mut db)
450        }
451    }
452
453    /// Phase 11.4 — opens a `BEGIN CONCURRENT` transaction on this
454    /// connection. Allocates a new `TxHandle` (which advances the
455    /// MVCC clock by one), deep-clones the live tables into the
456    /// per-connection [`ConcurrentTx`] state, and records the
457    /// schema fingerprint. Returns the status string the REPL
458    /// renders (`"BEGIN"`).
459    ///
460    /// Errors if the database isn't in `journal_mode = mvcc`, or
461    /// if any transaction (concurrent or legacy `BEGIN`) is
462    /// already open on this connection.
463    fn begin_concurrent(&mut self) -> Result<String> {
464        // Lock order: concurrent_tx → inner (db). Keep this order
465        // in every method that touches both — deadlock-free by
466        // construction.
467        let mut tx_slot = self.lock_concurrent_tx();
468        if tx_slot.is_some() {
469            return Err(SQLRiteError::General(
470                "cannot BEGIN CONCURRENT: a concurrent transaction is already open".to_string(),
471            ));
472        }
473        let db = self.lock();
474        if db.journal_mode() != JournalMode::Mvcc {
475            return Err(SQLRiteError::General(
476                "BEGIN CONCURRENT requires `PRAGMA journal_mode = mvcc;` first".to_string(),
477            ));
478        }
479        if db.in_transaction() {
480            return Err(SQLRiteError::General(
481                "cannot BEGIN CONCURRENT: a non-concurrent transaction is already open".to_string(),
482            ));
483        }
484        if db.is_read_only() {
485            return Err(SQLRiteError::General(
486                "cannot BEGIN CONCURRENT: database is opened read-only".to_string(),
487            ));
488        }
489        let tx = ConcurrentTx::begin(db.mvcc_clock(), db.mv_store().active_registry(), &db.tables);
490        drop(db);
491        *tx_slot = Some(tx);
492        Ok("BEGIN".to_string())
493    }
494
495    /// Phase 11.4 — commits the open concurrent transaction.
496    ///
497    /// Steps (Hekaton-style optimistic validation):
498    ///
499    /// 1. Diff the transaction's private `tables` against the
500    ///    live `Database::tables` to derive the write-set.
501    /// 2. For each row in the write-set, walk the
502    ///    [`MvStore`](crate::mvcc::MvStore) chain. If any
503    ///    committed version's `begin > tx.begin_ts`, abort with
504    ///    [`SQLRiteError::Busy`] — some other transaction
505    ///    superseded the row after our snapshot.
506    /// 3. Allocate a `commit_ts`, push every write into the
507    ///    `MvStore` as a committed version (caps the previous
508    ///    latest's `end` at `commit_ts`), and apply the writes
509    ///    to `Database::tables`.
510    /// 4. Run the legacy `save_database` so the changes durable
511    ///    via the existing WAL.
512    ///
513    /// On `Busy`, the transaction is dropped (rollback semantics)
514    /// and the caller should retry with a fresh `BEGIN
515    /// CONCURRENT`.
516    fn commit_concurrent(&mut self) -> Result<String> {
517        let mut tx_slot = self.lock_concurrent_tx();
518        let tx = tx_slot
519            .take()
520            .expect("commit_concurrent called without active tx (caller should check)");
521        // Drop the slot guard — we already moved the tx out, and
522        // holding it across `self.lock()` would violate the
523        // `concurrent_tx → inner` order if any helper were to
524        // grow a reverse acquire.
525        drop(tx_slot);
526
527        let mut db = self.lock();
528
529        // Schema drift catches DDL run on the live database under
530        // us. v0 rejects DDL inside the tx; outside is the only
531        // way to land here.
532        if !tx.schema_unchanged(&db.tables) {
533            return Err(SQLRiteError::Busy(
534                "schema changed under BEGIN CONCURRENT (a CREATE/DROP/ALTER ran on \
535                 another connection); transaction rolled back"
536                    .to_string(),
537            ));
538        }
539
540        // Diff against the BEGIN-time clone, NOT against the live
541        // database. Other concurrent transactions may have
542        // committed between our BEGIN and now; their writes show
543        // up in `db.tables` but aren't part of our write-set, and
544        // diffing against live would surface them as bogus DELETEs
545        // (silently undoing someone else's commit).
546        let writes = diff_tables_for_writes(&tx.tables_at_begin, &tx.tables)?;
547
548        // Validation pass: walk the write-set against MvStore.
549        let mv = db.mv_store().clone();
550        let begin_ts = tx.begin_ts();
551        for (row_id, _payload) in &writes {
552            if let Some(latest_begin) = mv.latest_committed_begin(row_id) {
553                if latest_begin > begin_ts {
554                    return Err(SQLRiteError::Busy(format!(
555                        "write-write conflict on {}/{}: another transaction committed \
556                         this row at ts={latest_begin} (after our begin_ts={begin_ts}); \
557                         transaction rolled back, retry with a fresh BEGIN CONCURRENT",
558                        row_id.table, row_id.rowid,
559                    )));
560                }
561            }
562        }
563
564        // Validation passed — allocate commit_ts and apply.
565        let commit_ts = db.mvcc_clock().tick();
566        for (row_id, payload) in &writes {
567            let version = RowVersion::committed(commit_ts, payload.clone());
568            // `push_committed`'s monotonic-begin check is satisfied
569            // because validation above ensured no version has
570            // begin >= commit_ts (commit_ts is freshly ticked).
571            mv.push_committed(row_id.clone(), version)
572                .map_err(|e| SQLRiteError::General(format!("MvStore push failed: {e}")))?;
573        }
574
575        // Apply the diff to Database::tables. Reuses the legacy
576        // INSERT / UPDATE / DELETE shape so post-commit reads on
577        // any handle (concurrent or legacy) see the latest row
578        // values via the existing read path.
579        apply_writes_to_live(&mut db, &tx.tables, &writes)?;
580
581        // Phase 11.9 — append the MVCC commit batch into the WAL
582        // before the legacy page-commit flush. The MVCC frame is
583        // not fsync'd on its own; the legacy `save_database`
584        // below ends with a commit-frame fsync that durably
585        // includes every byte written since the previous fsync,
586        // covering this batch too. A crash between the two
587        // append calls drops both — torn-write atomicity for the
588        // whole transaction.
589        //
590        // For in-memory databases (no source_path) we skip the
591        // WAL append: there's no pager and no fsync. MVCC state
592        // stays in the in-memory `MvStore` for the lifetime of
593        // the process.
594        if let Some(pager) = db.pager.as_mut() {
595            let records = writes
596                .iter()
597                .map(|(row, payload)| MvccLogRecord {
598                    row: row.clone(),
599                    payload: payload.clone(),
600                })
601                .collect();
602            let batch = MvccCommitBatch { commit_ts, records };
603            if let Err(append_err) = pager.append_mvcc_batch(&batch) {
604                return Err(SQLRiteError::General(format!(
605                    "COMMIT failed appending MVCC log record: {append_err}"
606                )));
607            }
608            // Bump the WAL header's persisted clock high-water so
609            // the next checkpoint truncates with a header that
610            // covers this commit. The MVCC frames themselves
611            // also carry `commit_ts`, so even an un-checkpointed
612            // crash still seeds the clock correctly via the
613            // replayer's max-with-frames logic — this just keeps
614            // the post-checkpoint path correct.
615            if let Err(set_err) = pager.observe_clock_high_water(commit_ts) {
616                return Err(SQLRiteError::General(format!(
617                    "COMMIT failed updating WAL clock high-water: {set_err}"
618                )));
619            }
620        }
621
622        // Persist via the legacy WAL — the on-disk format is
623        // unchanged in 11.4+. The page-commit's fsync below
624        // covers the MVCC frame appended above; one atomic
625        // boundary for the whole transaction.
626        if let Some(path) = db.source_path.clone() {
627            if let Err(save_err) = pager::save_database(&mut db, &path) {
628                return Err(SQLRiteError::General(format!(
629                    "COMMIT failed during save_database: {save_err}"
630                )));
631            }
632        }
633
634        // Phase 11.6 — per-commit GC sweep on the write-set's
635        // chains. Drop the `tx` handle FIRST so its `begin_ts`
636        // exits the active-tx registry; otherwise the watermark
637        // is still pinned at our own `begin_ts` and we'd preserve
638        // versions we're free to reclaim. Only the rows this
639        // transaction wrote can have a newly-capped `end` worth
640        // sweeping — the broader GC story (full-store sweeps,
641        // background drains) lands behind explicit
642        // [`Connection::vacuum_mvcc`] / [`MvStore::gc_all`].
643        drop(tx);
644        let watermark = mv.active_watermark();
645        for (row_id, _) in &writes {
646            mv.gc_chain(row_id, watermark);
647        }
648        Ok("COMMIT".to_string())
649    }
650
651    /// Phase 11.4 — rolls back the open concurrent transaction.
652    /// Drops the per-connection state; the live `Database::tables`
653    /// is unchanged because writes never landed there.
654    fn rollback_concurrent(&mut self) -> Result<String> {
655        // tx drops here; TxHandle unregisters automatically.
656        let _ = self
657            .lock_concurrent_tx()
658            .take()
659            .expect("rollback_concurrent called without active tx (caller should check)");
660        Ok("ROLLBACK".to_string())
661    }
662
663    /// Phase 11.4 — runs `sql` against the open concurrent
664    /// transaction's private cloned tables. Implementation: swap
665    /// `db.tables` <-> `tx.tables` for the duration of the
666    /// dispatch, suppress auto-save by parking a dummy
667    /// [`TxnSnapshot`] on `db.txn`, then unwind both.
668    ///
669    /// DDL is rejected before the swap with a typed error —
670    /// schema mutations inside a `BEGIN CONCURRENT` block aren't
671    /// supported in v0 (the plan flags this as an explicit
672    /// non-goal, and the swap-based dispatch can't safely apply
673    /// new tables to the live database without a separate merge
674    /// pass).
675    fn execute_in_concurrent_tx(&mut self, sql: &str) -> Result<String> {
676        self.execute_in_concurrent_tx_with_render(sql)
677            .map(|o| o.status)
678    }
679
680    /// Render-aware twin of [`Connection::execute_in_concurrent_tx`].
681    /// Same swap-based dispatch; the only difference is the inner
682    /// call goes through `process_command_with_render` so the
683    /// caller gets the rendered SELECT table (Phase 11.11a).
684    fn execute_in_concurrent_tx_with_render(
685        &mut self,
686        sql: &str,
687    ) -> Result<crate::sql::CommandOutput> {
688        let intent = legacy_tx_intent(sql);
689        if matches!(intent, LegacyTxIntent::Begin) {
690            return Err(SQLRiteError::General(
691                "cannot BEGIN: a concurrent transaction is already open".to_string(),
692            ));
693        }
694        // String-prefix DDL check. Rejecting up front means the
695        // tx's snapshot never gets a half-applied schema change —
696        // which would be hard to merge back at commit because the
697        // live database wouldn't agree.
698        if rejects_in_concurrent_tx(sql) {
699            return Err(SQLRiteError::General(
700                "DDL is not supported inside BEGIN CONCURRENT (v0 limitation; the \
701                 transaction stays open, the live schema is unchanged)"
702                    .to_string(),
703            ));
704        }
705
706        // Lock order: concurrent_tx → inner (db). Same shape as
707        // every other tx-aware path on this connection.
708        let mut tx_slot = self.lock_concurrent_tx();
709        let tx = tx_slot
710            .as_mut()
711            .expect("execute_in_concurrent_tx called without active tx");
712        let mut db = self.inner.lock().unwrap_or_else(|e| {
713            panic!("sqlrite: database mutex poisoned: {e}");
714        });
715
716        // Swap the snapshot in. After this, db.tables IS the tx's
717        // private clone; the executor mutates it freely.
718        std::mem::swap(&mut db.tables, &mut tx.tables);
719
720        // Suppress auto-save with a dummy TxnSnapshot. The
721        // executor's auto-save check looks at `db.in_transaction()`,
722        // which is true while `db.txn` is `Some`. The dummy
723        // snapshot is never restored from — `tx` itself owns the
724        // rollback story for concurrent transactions.
725        let prior_txn = db.txn.take();
726        db.txn = Some(TxnSnapshot {
727            tables: HashMap::new(),
728        });
729
730        let result = crate::sql::process_command_with_render(sql, &mut db);
731
732        // Unwind in reverse: take the dummy txn off (don't restore
733        // anything from it), swap the tables back.
734        db.txn = prior_txn;
735        std::mem::swap(&mut db.tables, &mut tx.tables);
736        result
737    }
738
739    /// Prepares a statement for repeated execution or row iteration.
740    /// SQLR-23: the SQL is parsed once at prepare time (sqlparser walk
741    /// plus placeholder rewriting), and the resulting AST is cached
742    /// on the [`Statement`] for re-execution without further parsing.
743    ///
744    /// Use [`Statement::query`] / [`Statement::run`] for unbound
745    /// execution, or [`Statement::query_with_params`] /
746    /// [`Statement::execute_with_params`] to substitute `?`
747    /// placeholders.
748    pub fn prepare<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
749        let plan = Arc::new(CachedPlan::compile(sql)?);
750        Ok(Statement { conn: self, plan })
751    }
752
753    /// Same as [`Connection::prepare`], but consults a small
754    /// per-connection LRU first. SQLR-23 — for hot statements
755    /// (the body of an INSERT loop, a frequently-rerun lookup) the
756    /// sqlparser walk is amortized to once across the connection's
757    /// lifetime, not once per `prepare()`.
758    ///
759    /// Default cache capacity is 16; tune with
760    /// [`Connection::set_prepared_cache_capacity`].
761    pub fn prepare_cached<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
762        // Lookup-or-insert. Found entries are also moved to the back
763        // (most-recently-used) so capacity-eviction runs LRU.
764        let plan = if let Some(pos) = self.prep_cache.iter().position(|(k, _)| k == sql) {
765            let (k, v) = self.prep_cache.remove(pos).unwrap();
766            self.prep_cache.push_back((k, Arc::clone(&v)));
767            v
768        } else {
769            let plan = Arc::new(CachedPlan::compile(sql)?);
770            self.prep_cache
771                .push_back((sql.to_string(), Arc::clone(&plan)));
772            while self.prep_cache.len() > self.prep_cache_cap {
773                self.prep_cache.pop_front();
774            }
775            plan
776        };
777        Ok(Statement { conn: self, plan })
778    }
779
780    /// SQLR-23 — sets the maximum number of cached prepared plans
781    /// (matches `prepare_cached`'s default 16). Reducing below the
782    /// current size evicts the oldest entries; setting to 0 disables
783    /// caching but `prepare_cached` still works (it just always
784    /// re-parses).
785    pub fn set_prepared_cache_capacity(&mut self, cap: usize) {
786        self.prep_cache_cap = cap;
787        while self.prep_cache.len() > cap {
788            self.prep_cache.pop_front();
789        }
790    }
791
792    /// SQLR-23 — current number of plans held by the prepared-statement
793    /// cache. Useful for tests / introspection; not load-bearing for
794    /// the public API.
795    pub fn prepared_cache_len(&self) -> usize {
796        self.prep_cache.len()
797    }
798
799    /// Returns `true` while a `BEGIN … COMMIT/ROLLBACK` block is open
800    /// against this connection.
801    pub fn in_transaction(&self) -> bool {
802        self.lock().in_transaction()
803    }
804
805    /// Returns the current auto-VACUUM threshold (SQLR-10). After a
806    /// page-releasing DDL (DROP TABLE / DROP INDEX / ALTER TABLE DROP
807    /// COLUMN) commits, the engine compacts the file in place if the
808    /// freelist exceeds this fraction of `page_count`. New connections
809    /// default to `Some(0.25)` (SQLite parity); `None` means the
810    /// trigger is disabled. See [`Connection::set_auto_vacuum_threshold`].
811    pub fn auto_vacuum_threshold(&self) -> Option<f32> {
812        self.lock().auto_vacuum_threshold()
813    }
814
815    /// Sets the auto-VACUUM threshold (SQLR-10). `Some(t)` with `t` in
816    /// `0.0..=1.0` arms the trigger; `None` disables it. Values outside
817    /// `0.0..=1.0` (or NaN / infinite) return a typed error rather than
818    /// silently saturating. The setting is per-database runtime state —
819    /// closing the last connection to a database drops it; new
820    /// connections start at the default `Some(0.25)`.
821    ///
822    /// Calling this on an in-memory or read-only database is allowed
823    /// (it just won't fire — there's nothing to compact / no writes
824    /// will reach the trigger).
825    pub fn set_auto_vacuum_threshold(&mut self, threshold: Option<f32>) -> Result<()> {
826        self.lock().set_auto_vacuum_threshold(threshold)
827    }
828
829    /// Returns `true` if the connection was opened read-only. Mutating
830    /// statements on a read-only connection return a typed error.
831    pub fn is_read_only(&self) -> bool {
832        self.lock().is_read_only()
833    }
834
835    /// Phase 11.3 — current journal mode. `Wal` (default) keeps every
836    /// pre-Phase-11 caller's behaviour. `Mvcc` is opt-in via
837    /// `PRAGMA journal_mode = mvcc;`. Per-database — every
838    /// [`Connection::connect`] sibling sees the same value.
839    ///
840    /// Today this is observable but doesn't change query behaviour;
841    /// 11.4 wires `Mvcc` mode into the read/write paths.
842    pub fn journal_mode(&self) -> crate::mvcc::JournalMode {
843        self.lock().journal_mode()
844    }
845
846    /// Phase 11.6 — explicit full-store MVCC garbage collection
847    /// pass. Walks every row in the [`MvStore`](crate::mvcc::MvStore)
848    /// chain and drops versions whose `end` timestamp is below the
849    /// current watermark (the smallest `begin_ts` across all
850    /// in-flight transactions on this database, or `u64::MAX` when
851    /// nothing is in flight).
852    ///
853    /// Returns the number of versions reclaimed. Cheap when the
854    /// store is small; a future optimisation will give it
855    /// background-thread semantics behind a configurable cadence.
856    ///
857    /// Per-commit GC already sweeps the rows each transaction
858    /// touched, so most callers don't need this — it's the
859    /// "vacuum the whole store" escape hatch for memory-pressure
860    /// workloads or test suites that want a deterministic baseline.
861    /// Safe to call even if `journal_mode` is `Wal` (the store is
862    /// just empty); useful for tests that want to assert "no
863    /// versions left."
864    pub fn vacuum_mvcc(&self) -> usize {
865        let db = self.lock();
866        let mv = db.mv_store().clone();
867        let watermark = mv.active_watermark();
868        drop(db);
869        mv.gc_all(watermark)
870    }
871
872    /// Escape hatch for advanced callers — locks the shared `Database`
873    /// and hands back the guard. Not part of the stable API; will move
874    /// or change as Phase 10's MVCC sub-phases land.
875    ///
876    /// Bind the guard to a local before calling functions that take
877    /// `&Database`:
878    ///
879    /// ```no_run
880    /// # use sqlrite::Connection;
881    /// # fn use_db(_d: &sqlrite::Database) {}
882    /// let conn = Connection::open_in_memory()?;
883    /// let db = conn.database();
884    /// use_db(&db);
885    /// # Ok::<(), sqlrite::SQLRiteError>(())
886    /// ```
887    #[doc(hidden)]
888    pub fn database(&self) -> MutexGuard<'_, Database> {
889        self.lock()
890    }
891
892    #[doc(hidden)]
893    pub fn database_mut(&mut self) -> MutexGuard<'_, Database> {
894        self.lock()
895    }
896}
897
898impl std::fmt::Debug for Connection {
899    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
900        let db = self.lock();
901        f.debug_struct("Connection")
902            .field("in_transaction", &db.in_transaction())
903            .field("read_only", &db.is_read_only())
904            .field("tables", &db.tables.len())
905            .field("prep_cache_len", &self.prep_cache.len())
906            .field("handles", &Arc::strong_count(&self.inner))
907            .field("concurrent_tx", &self.concurrent_tx_is_open())
908            .finish()
909    }
910}
911
912// =====================================================================
913// Phase 11.4 — concurrent-transaction helpers
914//
915// These live as free functions (rather than methods) so the borrow
916// checker stays out of the way: callers in `Connection::execute*`
917// already juggle mutable borrows of `self.concurrent_tx` and
918// `self.inner.lock()` simultaneously, and threading a third `&mut self`
919// through helpers would force every helper to either take owned
920// arguments or split the borrow at the call site. Free functions take
921// exactly the slices they need.
922
923/// Coarse classifier for tx-control statements. Spotted by string
924/// match before `sqlparser` runs, just like the PRAGMA intercept.
925/// Distinguishing `BEGIN CONCURRENT` from plain `BEGIN` matters
926/// because plain `BEGIN` still routes through the legacy
927/// deep-clone snapshot path; only `BEGIN CONCURRENT` opens an
928/// MVCC transaction.
929#[derive(Debug, Clone, Copy, PartialEq, Eq)]
930enum ConcurrentTxIntent {
931    /// `BEGIN CONCURRENT` — opens an MVCC transaction.
932    Begin,
933    /// `COMMIT` (with optional `TRANSACTION` / `WORK` / `;`).
934    Commit,
935    /// `ROLLBACK` (with optional `TRANSACTION` / `WORK` / `;`).
936    Rollback,
937    /// Anything else — falls through to the regular dispatch.
938    None,
939}
940
941/// Coarse classifier for legacy tx-control statements (used to
942/// reject nested `BEGIN` inside an open `BEGIN CONCURRENT`).
943#[derive(Debug, Clone, Copy, PartialEq, Eq)]
944enum LegacyTxIntent {
945    /// Plain `BEGIN` / `BEGIN TRANSACTION` / `BEGIN DEFERRED` etc.
946    /// — every shape that *isn't* `BEGIN CONCURRENT`.
947    Begin,
948    /// Anything else.
949    None,
950}
951
952fn concurrent_tx_intent(sql: &str) -> ConcurrentTxIntent {
953    let tokens = lowercase_tokens(sql);
954    let head = tokens.as_slice();
955    match head {
956        [first, second, ..] if first == "begin" && second == "concurrent" => {
957            ConcurrentTxIntent::Begin
958        }
959        [first, ..] if first == "commit" => ConcurrentTxIntent::Commit,
960        [first, ..] if first == "end" => ConcurrentTxIntent::Commit,
961        [first, ..] if first == "rollback" => ConcurrentTxIntent::Rollback,
962        _ => ConcurrentTxIntent::None,
963    }
964}
965
966fn legacy_tx_intent(sql: &str) -> LegacyTxIntent {
967    let tokens = lowercase_tokens(sql);
968    let head = tokens.as_slice();
969    match head {
970        // Plain BEGIN — but not BEGIN CONCURRENT, which the
971        // concurrent-tx intent already caught.
972        [first, ..] if first == "begin" => {
973            if matches!(head.get(1).map(String::as_str), Some("concurrent")) {
974                LegacyTxIntent::None
975            } else {
976                LegacyTxIntent::Begin
977            }
978        }
979        [first, ..] if first == "start" => LegacyTxIntent::Begin,
980        _ => LegacyTxIntent::None,
981    }
982}
983
984/// Splits `sql` on whitespace + punctuation that's not part of
985/// keywords, lowercases each piece, and returns the resulting
986/// token list. Coarse enough to spot `BEGIN`, `COMMIT`,
987/// `ROLLBACK`, `CONCURRENT`, `TRANSACTION`, etc.; not a real
988/// tokenizer.
989fn lowercase_tokens(sql: &str) -> Vec<String> {
990    sql.split(|c: char| c.is_whitespace() || c == ';' || c == '(' || c == ')' || c == ',')
991        .filter(|t| !t.is_empty())
992        .map(|t| t.to_ascii_lowercase())
993        .collect()
994}
995
996/// Statement shapes that must be rejected inside a `BEGIN
997/// CONCURRENT` block. v0 covers the canonical DDL — CREATE
998/// TABLE, CREATE INDEX, DROP TABLE, DROP INDEX, ALTER TABLE,
999/// VACUUM. Cheap string-prefix check; misses contrived
1000/// formattings like a leading SQL comment, but the rejection is
1001/// best-effort and v0 doesn't promise schema isolation inside
1002/// the tx anyway.
1003fn rejects_in_concurrent_tx(sql: &str) -> bool {
1004    let trimmed = sql.trim_start();
1005    let lower = trimmed.to_ascii_lowercase();
1006    lower.starts_with("create ")
1007        || lower.starts_with("drop ")
1008        || lower.starts_with("alter ")
1009        || lower.starts_with("vacuum")
1010}
1011
1012/// Phase 11.4 commit-time helper — diff `live` (the original
1013/// `Database::tables` map) against `snapshot` (the
1014/// transaction's private clone, post-statements) and produce
1015/// the write-set: every `(RowID, VersionPayload)` whose value
1016/// in the snapshot differs from the live state.
1017///
1018/// Three cases:
1019///
1020/// - Row in snapshot but not in live → INSERT, payload =
1021///   [`VersionPayload::Present`] of snapshot's column-value
1022///   pairs.
1023/// - Row in both, with different column values → UPDATE, same
1024///   shape.
1025/// - Row in live but not in snapshot → DELETE, payload =
1026///   [`VersionPayload::Tombstone`].
1027///
1028/// Errors only if the snapshot's table set drifted from the
1029/// live database (DDL was rejected at execute-time so this
1030/// shouldn't fire; the typed error guards against bugs).
1031fn diff_tables_for_writes(
1032    live: &HashMap<String, Table>,
1033    snapshot: &HashMap<String, Table>,
1034) -> Result<Vec<(RowID, VersionPayload)>> {
1035    let mut writes: Vec<(RowID, VersionPayload)> = Vec::new();
1036    for (name, snap_table) in snapshot {
1037        let live_table = live.get(name).ok_or_else(|| {
1038            SQLRiteError::Internal(format!(
1039                "concurrent commit: table '{name}' missing from live database"
1040            ))
1041        })?;
1042        let live_rowids: std::collections::HashSet<i64> = live_table.rowids().into_iter().collect();
1043        let snap_rowids = snap_table.rowids();
1044        for rowid in &snap_rowids {
1045            let snap_payload = build_payload(snap_table, *rowid);
1046            if live_rowids.contains(rowid) {
1047                let live_payload = build_payload(live_table, *rowid);
1048                if live_payload != snap_payload {
1049                    writes.push((RowID::new(name, *rowid), snap_payload));
1050                }
1051            } else {
1052                writes.push((RowID::new(name, *rowid), snap_payload));
1053            }
1054        }
1055        let snap_set: std::collections::HashSet<i64> = snap_rowids.into_iter().collect();
1056        for rowid in live_table.rowids() {
1057            if !snap_set.contains(&rowid) {
1058                writes.push((RowID::new(name, rowid), VersionPayload::Tombstone));
1059            }
1060        }
1061    }
1062    Ok(writes)
1063}
1064
1065/// Builds a [`VersionPayload::Present`] from a row's column-value
1066/// pairs. Column order is the table's declaration order; missing
1067/// values surface as [`Value::Null`].
1068fn build_payload(table: &Table, rowid: i64) -> VersionPayload {
1069    let cols = table.column_names();
1070    let vals = table.extract_row(rowid);
1071    let pairs: Vec<(String, Value)> = cols
1072        .into_iter()
1073        .zip(vals)
1074        .map(|(c, v)| (c, v.unwrap_or(Value::Null)))
1075        .collect();
1076    VersionPayload::Present(pairs)
1077}
1078
1079/// Applies the commit's write-set onto the live database
1080/// row-by-row. Each `(RowID, payload)` translates into a
1081/// `delete_row` (always — clears column data and any
1082/// secondary-index entries that reference the row) followed
1083/// by a `restore_row` if the payload is `Present`.
1084///
1085/// Per-row apply rather than wholesale table-replace because
1086/// other concurrent transactions may have committed onto the
1087/// live database between our BEGIN and our COMMIT — replacing
1088/// the whole table would silently undo their disjoint writes.
1089/// The validation pass already proved we have no row-level
1090/// conflict with those commits, so writing only our own rows
1091/// preserves theirs.
1092///
1093/// The `_snapshot` parameter is unused today but kept on the
1094/// signature so the FTS / HNSW maintenance pass can grow into
1095/// it in a follow-up (the snapshot has the secondary-index
1096/// state the executor built during the tx; the live table
1097/// will need the same updates if that index is on a touched
1098/// column).
1099fn apply_writes_to_live(
1100    db: &mut Database,
1101    _snapshot: &HashMap<String, Table>,
1102    writes: &[(RowID, VersionPayload)],
1103) -> Result<()> {
1104    for (row_id, payload) in writes {
1105        let live_table = db.tables.get_mut(&row_id.table).ok_or_else(|| {
1106            SQLRiteError::Internal(format!(
1107                "concurrent commit: table '{}' missing from live database",
1108                row_id.table
1109            ))
1110        })?;
1111        // Always remove the existing row first — this clears the
1112        // per-column storage and the secondary-index entries that
1113        // reference it. INSERT (no existing row) is a no-op
1114        // delete; UPDATE turns into delete-then-insert; DELETE is
1115        // just delete.
1116        live_table.delete_row(row_id.rowid);
1117        if let VersionPayload::Present(cols) = payload {
1118            // The payload's column order matches the table's
1119            // declaration order (build_payload uses
1120            // column_names() and extract_row(), both of which
1121            // walk in declaration order). Map back into the
1122            // `Vec<Option<Value>>` shape `restore_row` expects.
1123            let values: Vec<Option<Value>> = cols
1124                .iter()
1125                .map(|(_col, value)| match value {
1126                    Value::Null => None,
1127                    other => Some(other.clone()),
1128                })
1129                .collect();
1130            live_table.restore_row(row_id.rowid, values).map_err(|e| {
1131                SQLRiteError::Internal(format!(
1132                    "concurrent commit: restore_row({}) on table '{}' failed: {e}",
1133                    row_id.rowid, row_id.table,
1134                ))
1135            })?;
1136        }
1137    }
1138    Ok(())
1139}
1140
1141/// SQLR-23 — the parse-once-execute-many representation. Built by
1142/// `CachedPlan::compile` (sqlparser walk + placeholder rewriting +
1143/// SELECT narrowing) and shared between every `Statement` that hits
1144/// the same SQL string in `prepare_cached`.
1145#[derive(Debug)]
1146struct CachedPlan {
1147    /// Original SQL — kept for diagnostic output.
1148    #[allow(dead_code)]
1149    sql: String,
1150    /// AST after `?` → `?N` placeholder rewriting. Cloned per execute
1151    /// so the substitution pass leaves the cached copy intact.
1152    ast: AstStatement,
1153    /// Total `?` placeholder count in the source SQL. Strict bind
1154    /// validation in `query_with_params` / `execute_with_params`
1155    /// uses this.
1156    param_count: usize,
1157    /// SELECT narrowing — cached so `query()` doesn't redo the
1158    /// `SelectQuery::new` walk for unbound SELECTs. `None` for
1159    /// non-SELECT statements.
1160    select: Option<SelectQuery>,
1161}
1162
1163impl CachedPlan {
1164    fn compile(sql: &str) -> Result<Self> {
1165        let dialect = SqlriteDialect::new();
1166        let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1167        let Some(mut stmt) = ast.pop() else {
1168            return Err(SQLRiteError::General("no statement to prepare".to_string()));
1169        };
1170        if !ast.is_empty() {
1171            return Err(SQLRiteError::General(
1172                "prepare() accepts a single statement; found more than one".to_string(),
1173            ));
1174        }
1175        let param_count = rewrite_placeholders(&mut stmt);
1176        let select = match &stmt {
1177            AstStatement::Query(_) => Some(SelectQuery::new(&stmt)?),
1178            _ => None,
1179        };
1180        Ok(Self {
1181            sql: sql.to_string(),
1182            ast: stmt,
1183            param_count,
1184            select,
1185        })
1186    }
1187}
1188
1189/// A prepared statement bound to a specific connection lifetime.
1190///
1191/// SQLR-23 — `Statement` carries the parsed AST (parsed exactly once
1192/// at prepare time), not just the raw SQL. `query` / `run` execute
1193/// against the cached AST; `query_with_params` / `execute_with_params`
1194/// clone the AST and substitute `?` placeholders before dispatch.
1195pub struct Statement<'c> {
1196    conn: &'c mut Connection,
1197    plan: Arc<CachedPlan>,
1198}
1199
1200impl std::fmt::Debug for Statement<'_> {
1201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202        f.debug_struct("Statement")
1203            .field("sql", &self.plan.sql)
1204            .field("param_count", &self.plan.param_count)
1205            .field(
1206                "kind",
1207                &match self.plan.select {
1208                    Some(_) => "Select",
1209                    None => "Other",
1210                },
1211            )
1212            .finish()
1213    }
1214}
1215
1216impl<'c> Statement<'c> {
1217    /// Number of `?` placeholders detected in the source SQL. Strict
1218    /// arity validation: passing a slice of a different length to
1219    /// `query_with_params` / `execute_with_params` returns a typed
1220    /// error.
1221    pub fn parameter_count(&self) -> usize {
1222        self.plan.param_count
1223    }
1224
1225    /// Executes a prepared non-query statement. Equivalent to
1226    /// [`Connection::execute`] — included for parity with the
1227    /// typed-row `query()` so callers who want `Statement::run` /
1228    /// `Statement::query` symmetry get it.
1229    ///
1230    /// Errors if the prepared SQL contains `?` placeholders — use
1231    /// [`Statement::execute_with_params`] for those.
1232    pub fn run(&mut self) -> Result<String> {
1233        if self.plan.param_count > 0 {
1234            return Err(SQLRiteError::General(format!(
1235                "statement has {} `?` placeholder(s); call execute_with_params()",
1236                self.plan.param_count
1237            )));
1238        }
1239        let ast = self.plan.ast.clone();
1240        let mut db = self.conn.lock();
1241        process_ast_with_render(ast, &mut db).map(|o| o.status)
1242    }
1243
1244    /// SQLR-23 — executes a prepared non-SELECT statement after binding
1245    /// `?` placeholders to `params` (positional, in source order).
1246    ///
1247    /// Use this for parameterized INSERT / UPDATE / DELETE — the
1248    /// substitution clones the cached AST, fills in the `?` slots
1249    /// from `params`, and dispatches without re-running sqlparser.
1250    /// For SELECT, prefer [`Statement::query_with_params`].
1251    pub fn execute_with_params(&mut self, params: &[Value]) -> Result<String> {
1252        self.check_arity(params)?;
1253        let mut ast = self.plan.ast.clone();
1254        if !params.is_empty() {
1255            substitute_params(&mut ast, params)?;
1256        }
1257        let mut db = self.conn.lock();
1258        process_ast_with_render(ast, &mut db).map(|o| o.status)
1259    }
1260
1261    /// Runs a SELECT and returns a [`Rows`] iterator over typed rows.
1262    /// Errors if the prepared statement isn't a SELECT.
1263    ///
1264    /// SQLR-23 — uses the SELECT narrowing cached at prepare time;
1265    /// no per-call sqlparser walk. Errors if the prepared SQL
1266    /// contains `?` placeholders — use [`Statement::query_with_params`]
1267    /// for those.
1268    pub fn query(&self) -> Result<Rows> {
1269        if self.plan.param_count > 0 {
1270            return Err(SQLRiteError::General(format!(
1271                "statement has {} `?` placeholder(s); call query_with_params()",
1272                self.plan.param_count
1273            )));
1274        }
1275        let Some(sq) = self.plan.select.as_ref() else {
1276            return Err(SQLRiteError::General(
1277                "query() only works on SELECT statements; use run() for DDL/DML".to_string(),
1278            ));
1279        };
1280        // Phase 11.5 — when a `BEGIN CONCURRENT` is open on this
1281        // connection, the read sees the transaction's BEGIN-time
1282        // snapshot, not the post-commit live database. The
1283        // helper handles the swap (and the no-op fallback for
1284        // the common case where no concurrent tx is open).
1285        let result = self
1286            .conn
1287            .with_snapshot_read(|db| execute_select_rows(sq.clone(), db))?;
1288        Ok(Rows {
1289            columns: result.columns,
1290            rows: result.rows.into_iter(),
1291        })
1292    }
1293
1294    /// SQLR-23 — runs a SELECT and returns a [`Rows`] iterator after
1295    /// binding `?` placeholders to `params`. Positional, source-order
1296    /// indexing — `params[0]` is `?1`, `params[1]` is `?2`, etc.
1297    ///
1298    /// Vector parameters (`Value::Vector(...)`) substitute as the
1299    /// in-band bracket-array shape the executor recognizes, so a
1300    /// bound query vector still triggers the HNSW probe optimizer
1301    /// (Phase 7d.2 KNN shortcut).
1302    pub fn query_with_params(&self, params: &[Value]) -> Result<Rows> {
1303        self.check_arity(params)?;
1304        if self.plan.select.is_none() {
1305            return Err(SQLRiteError::General(
1306                "query_with_params() only works on SELECT statements; use execute_with_params() \
1307                 for DDL/DML"
1308                    .to_string(),
1309            ));
1310        }
1311        // Re-narrow against the substituted AST. The narrow walk is
1312        // cheap (it pulls projection/WHERE/ORDER BY into typed
1313        // structs), and rerunning it ensures the substituted literals
1314        // (e.g. a bracket-array vector) flow through `SelectQuery`.
1315        let mut ast = self.plan.ast.clone();
1316        if !params.is_empty() {
1317            substitute_params(&mut ast, params)?;
1318        }
1319        let sq = SelectQuery::new(&ast)?;
1320        // Phase 11.5 — same snapshot-read path as `query()`, just
1321        // running on the substituted SelectQuery rather than the
1322        // cached one.
1323        let result = self
1324            .conn
1325            .with_snapshot_read(|db| execute_select_rows(sq, db))?;
1326        Ok(Rows {
1327            columns: result.columns,
1328            rows: result.rows.into_iter(),
1329        })
1330    }
1331
1332    fn check_arity(&self, params: &[Value]) -> Result<()> {
1333        if params.len() != self.plan.param_count {
1334            return Err(SQLRiteError::General(format!(
1335                "expected {} parameter{}, got {}",
1336                self.plan.param_count,
1337                if self.plan.param_count == 1 { "" } else { "s" },
1338                params.len()
1339            )));
1340        }
1341        Ok(())
1342    }
1343
1344    /// Column names this statement will produce, in projection order.
1345    /// `None` for non-SELECT statements.
1346    pub fn column_names(&self) -> Option<Vec<String>> {
1347        match &self.plan.select {
1348            Some(_) => {
1349                // We can't know the concrete column list without
1350                // running the query (it depends on the table schema
1351                // and the projection). Callers who need it up front
1352                // should call query() and inspect Rows::columns.
1353                None
1354            }
1355            None => None,
1356        }
1357    }
1358}
1359
1360/// Iterator of typed [`Row`] values produced by a `SELECT` query.
1361///
1362/// Today `Rows` is backed by an eager `Vec<Vec<Value>>` — the cursor
1363/// abstraction in Phase 5a's follow-up will swap this for a lazy
1364/// walker that streams rows off the B-Tree without materializing
1365/// them upfront. The `Rows::next` API is designed for that: it
1366/// returns `Result<Option<Row>>` rather than `Option<Result<Row>>`,
1367/// so a mid-stream I/O error surfaces cleanly.
1368pub struct Rows {
1369    columns: Vec<String>,
1370    rows: std::vec::IntoIter<Vec<Value>>,
1371}
1372
1373impl std::fmt::Debug for Rows {
1374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1375        f.debug_struct("Rows")
1376            .field("columns", &self.columns)
1377            .field("remaining", &self.rows.len())
1378            .finish()
1379    }
1380}
1381
1382impl Rows {
1383    /// Column names in projection order.
1384    pub fn columns(&self) -> &[String] {
1385        &self.columns
1386    }
1387
1388    /// Advances to the next row. Returns `Ok(None)` when the query is
1389    /// exhausted, `Ok(Some(row))` otherwise, `Err(_)` on an I/O or
1390    /// decode failure (relevant once Phase 5a's cursor work lands —
1391    /// today this is always `Ok(_)`).
1392    pub fn next(&mut self) -> Result<Option<Row<'_>>> {
1393        Ok(self.rows.next().map(|values| Row {
1394            columns: &self.columns,
1395            values,
1396        }))
1397    }
1398
1399    /// Collects every remaining row into a `Vec<Row>`. Convenient for
1400    /// small result sets; avoid on large queries — that's what the
1401    /// streaming [`Rows::next`] API is for.
1402    pub fn collect_all(mut self) -> Result<Vec<OwnedRow>> {
1403        let mut out = Vec::new();
1404        while let Some(r) = self.next()? {
1405            out.push(r.to_owned_row());
1406        }
1407        Ok(out)
1408    }
1409}
1410
1411/// A single row borrowed from a [`Rows`] iterator. Lives only as long
1412/// as the iterator; call `Row::to_owned_row` to detach it if you need
1413/// to keep it past the next `next()` call.
1414pub struct Row<'r> {
1415    columns: &'r [String],
1416    values: Vec<Value>,
1417}
1418
1419impl<'r> Row<'r> {
1420    /// Value at column index `idx`. Returns a clean error if out of
1421    /// bounds or the type conversion fails.
1422    pub fn get<T: FromValue>(&self, idx: usize) -> Result<T> {
1423        let v = self.values.get(idx).ok_or_else(|| {
1424            SQLRiteError::General(format!(
1425                "column index {idx} out of bounds (row has {} columns)",
1426                self.values.len()
1427            ))
1428        })?;
1429        T::from_value(v)
1430    }
1431
1432    /// Value at column named `name`. Case-sensitive.
1433    pub fn get_by_name<T: FromValue>(&self, name: &str) -> Result<T> {
1434        let idx = self
1435            .columns
1436            .iter()
1437            .position(|c| c == name)
1438            .ok_or_else(|| SQLRiteError::General(format!("no column named '{name}' in row")))?;
1439        self.get(idx)
1440    }
1441
1442    /// Column names for this row.
1443    pub fn columns(&self) -> &[String] {
1444        self.columns
1445    }
1446
1447    /// Detaches from the parent `Rows` iterator. Useful when you want
1448    /// to keep rows past the next `Rows::next()` call.
1449    pub fn to_owned_row(&self) -> OwnedRow {
1450        OwnedRow {
1451            columns: self.columns.to_vec(),
1452            values: self.values.clone(),
1453        }
1454    }
1455}
1456
1457/// A row detached from the `Rows` iterator — owns its data, no
1458/// borrow ties it to the parent iterator.
1459#[derive(Debug, Clone)]
1460pub struct OwnedRow {
1461    pub columns: Vec<String>,
1462    pub values: Vec<Value>,
1463}
1464
1465impl OwnedRow {
1466    pub fn get<T: FromValue>(&self, idx: usize) -> Result<T> {
1467        let v = self.values.get(idx).ok_or_else(|| {
1468            SQLRiteError::General(format!(
1469                "column index {idx} out of bounds (row has {} columns)",
1470                self.values.len()
1471            ))
1472        })?;
1473        T::from_value(v)
1474    }
1475
1476    pub fn get_by_name<T: FromValue>(&self, name: &str) -> Result<T> {
1477        let idx = self
1478            .columns
1479            .iter()
1480            .position(|c| c == name)
1481            .ok_or_else(|| SQLRiteError::General(format!("no column named '{name}' in row")))?;
1482        self.get(idx)
1483    }
1484}
1485
1486/// Conversion from SQLRite's internal [`Value`] enum into a typed Rust
1487/// value. Implementations cover the common built-ins — `i64`, `f64`,
1488/// `String`, `bool`, and `Option<T>` for nullable columns. Extend on
1489/// demand.
1490pub trait FromValue: Sized {
1491    fn from_value(v: &Value) -> Result<Self>;
1492}
1493
1494impl FromValue for i64 {
1495    fn from_value(v: &Value) -> Result<Self> {
1496        match v {
1497            Value::Integer(n) => Ok(*n),
1498            Value::Null => Err(SQLRiteError::General(
1499                "expected Integer, got NULL".to_string(),
1500            )),
1501            other => Err(SQLRiteError::General(format!(
1502                "cannot convert {other:?} to i64"
1503            ))),
1504        }
1505    }
1506}
1507
1508impl FromValue for f64 {
1509    fn from_value(v: &Value) -> Result<Self> {
1510        match v {
1511            Value::Real(f) => Ok(*f),
1512            Value::Integer(n) => Ok(*n as f64),
1513            Value::Null => Err(SQLRiteError::General("expected Real, got NULL".to_string())),
1514            other => Err(SQLRiteError::General(format!(
1515                "cannot convert {other:?} to f64"
1516            ))),
1517        }
1518    }
1519}
1520
1521impl FromValue for String {
1522    fn from_value(v: &Value) -> Result<Self> {
1523        match v {
1524            Value::Text(s) => Ok(s.clone()),
1525            Value::Null => Err(SQLRiteError::General("expected Text, got NULL".to_string())),
1526            other => Err(SQLRiteError::General(format!(
1527                "cannot convert {other:?} to String"
1528            ))),
1529        }
1530    }
1531}
1532
1533impl FromValue for bool {
1534    fn from_value(v: &Value) -> Result<Self> {
1535        match v {
1536            Value::Bool(b) => Ok(*b),
1537            Value::Integer(n) => Ok(*n != 0),
1538            Value::Null => Err(SQLRiteError::General("expected Bool, got NULL".to_string())),
1539            other => Err(SQLRiteError::General(format!(
1540                "cannot convert {other:?} to bool"
1541            ))),
1542        }
1543    }
1544}
1545
1546/// Nullable columns: `Option<T>` maps `NULL → None` and everything else
1547/// through the inner type's `FromValue` impl.
1548impl<T: FromValue> FromValue for Option<T> {
1549    fn from_value(v: &Value) -> Result<Self> {
1550        match v {
1551            Value::Null => Ok(None),
1552            other => Ok(Some(T::from_value(other)?)),
1553        }
1554    }
1555}
1556
1557/// Identity impl so `row.get::<_, Value>(0)` works when you want
1558/// untyped access.
1559impl FromValue for Value {
1560    fn from_value(v: &Value) -> Result<Self> {
1561        Ok(v.clone())
1562    }
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567    use super::*;
1568
1569    fn tmp_path(name: &str) -> std::path::PathBuf {
1570        let mut p = std::env::temp_dir();
1571        let pid = std::process::id();
1572        let nanos = std::time::SystemTime::now()
1573            .duration_since(std::time::UNIX_EPOCH)
1574            .map(|d| d.as_nanos())
1575            .unwrap_or(0);
1576        p.push(format!("sqlrite-conn-{pid}-{nanos}-{name}.sqlrite"));
1577        p
1578    }
1579
1580    fn cleanup(path: &std::path::Path) {
1581        let _ = std::fs::remove_file(path);
1582        let mut wal = path.as_os_str().to_owned();
1583        wal.push("-wal");
1584        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1585    }
1586
1587    #[test]
1588    fn in_memory_roundtrip() {
1589        let mut conn = Connection::open_in_memory().unwrap();
1590        conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER);")
1591            .unwrap();
1592        conn.execute("INSERT INTO users (name, age) VALUES ('alice', 30);")
1593            .unwrap();
1594        conn.execute("INSERT INTO users (name, age) VALUES ('bob', 25);")
1595            .unwrap();
1596
1597        let stmt = conn.prepare("SELECT id, name, age FROM users;").unwrap();
1598        let mut rows = stmt.query().unwrap();
1599        assert_eq!(rows.columns(), &["id", "name", "age"]);
1600        let mut collected: Vec<(i64, String, i64)> = Vec::new();
1601        while let Some(row) = rows.next().unwrap() {
1602            collected.push((
1603                row.get::<i64>(0).unwrap(),
1604                row.get::<String>(1).unwrap(),
1605                row.get::<i64>(2).unwrap(),
1606            ));
1607        }
1608        assert_eq!(collected.len(), 2);
1609        assert!(collected.iter().any(|(_, n, a)| n == "alice" && *a == 30));
1610        assert!(collected.iter().any(|(_, n, a)| n == "bob" && *a == 25));
1611    }
1612
1613    #[test]
1614    fn file_backed_persists_across_connections() {
1615        let path = tmp_path("persist");
1616        {
1617            let mut c1 = Connection::open(&path).unwrap();
1618            c1.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, label TEXT);")
1619                .unwrap();
1620            c1.execute("INSERT INTO items (label) VALUES ('one');")
1621                .unwrap();
1622        }
1623        {
1624            let mut c2 = Connection::open(&path).unwrap();
1625            let stmt = c2.prepare("SELECT label FROM items;").unwrap();
1626            let mut rows = stmt.query().unwrap();
1627            let first = rows.next().unwrap().expect("one row");
1628            assert_eq!(first.get::<String>(0).unwrap(), "one");
1629            assert!(rows.next().unwrap().is_none());
1630        }
1631        cleanup(&path);
1632    }
1633
1634    #[test]
1635    fn read_only_connection_rejects_writes() {
1636        let path = tmp_path("ro_reject");
1637        {
1638            let mut c = Connection::open(&path).unwrap();
1639            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
1640                .unwrap();
1641            c.execute("INSERT INTO t (id) VALUES (1);").unwrap();
1642        } // writer drops → releases exclusive lock
1643
1644        let mut ro = Connection::open_read_only(&path).unwrap();
1645        assert!(ro.is_read_only());
1646        let err = ro.execute("INSERT INTO t (id) VALUES (2);").unwrap_err();
1647        assert!(format!("{err}").contains("read-only"));
1648        cleanup(&path);
1649    }
1650
1651    #[test]
1652    fn transactions_work_through_connection() {
1653        let mut conn = Connection::open_in_memory().unwrap();
1654        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, x INTEGER);")
1655            .unwrap();
1656        conn.execute("INSERT INTO t (x) VALUES (1);").unwrap();
1657
1658        conn.execute("BEGIN;").unwrap();
1659        assert!(conn.in_transaction());
1660        conn.execute("INSERT INTO t (x) VALUES (2);").unwrap();
1661        conn.execute("ROLLBACK;").unwrap();
1662        assert!(!conn.in_transaction());
1663
1664        let stmt = conn.prepare("SELECT x FROM t;").unwrap();
1665        let rows = stmt.query().unwrap().collect_all().unwrap();
1666        assert_eq!(rows.len(), 1);
1667        assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1668    }
1669
1670    #[test]
1671    fn get_by_name_works() {
1672        let mut conn = Connection::open_in_memory().unwrap();
1673        conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1674        conn.execute("INSERT INTO t (a, b) VALUES (42, 'hello');")
1675            .unwrap();
1676
1677        let stmt = conn.prepare("SELECT a, b FROM t;").unwrap();
1678        let mut rows = stmt.query().unwrap();
1679        let row = rows.next().unwrap().unwrap();
1680        assert_eq!(row.get_by_name::<i64>("a").unwrap(), 42);
1681        assert_eq!(row.get_by_name::<String>("b").unwrap(), "hello");
1682    }
1683
1684    #[test]
1685    fn null_column_maps_to_none() {
1686        let mut conn = Connection::open_in_memory().unwrap();
1687        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, note TEXT);")
1688            .unwrap();
1689        // id INTEGER PRIMARY KEY autoincrements; `note` is left unspecified.
1690        conn.execute("INSERT INTO t (id) VALUES (1);").unwrap();
1691
1692        let stmt = conn.prepare("SELECT id, note FROM t;").unwrap();
1693        let mut rows = stmt.query().unwrap();
1694        let row = rows.next().unwrap().unwrap();
1695        assert_eq!(row.get::<i64>(0).unwrap(), 1);
1696        // note is NULL → Option<String> resolves to None.
1697        assert_eq!(row.get::<Option<String>>(1).unwrap(), None);
1698    }
1699
1700    #[test]
1701    fn prepare_rejects_multiple_statements() {
1702        let mut conn = Connection::open_in_memory().unwrap();
1703        let err = conn.prepare("SELECT 1; SELECT 2;").unwrap_err();
1704        assert!(format!("{err}").contains("single statement"));
1705    }
1706
1707    #[test]
1708    fn query_on_non_select_errors() {
1709        let mut conn = Connection::open_in_memory().unwrap();
1710        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
1711            .unwrap();
1712        let stmt = conn.prepare("INSERT INTO t VALUES (1);").unwrap();
1713        let err = stmt.query().unwrap_err();
1714        assert!(format!("{err}").contains("SELECT"));
1715    }
1716
1717    /// SQLR-10: fresh connections expose the SQLite-parity 25% default,
1718    /// the setter validates its input, and `None` opts out cleanly.
1719    #[test]
1720    fn auto_vacuum_threshold_default_and_setter() {
1721        let mut conn = Connection::open_in_memory().unwrap();
1722        assert_eq!(
1723            conn.auto_vacuum_threshold(),
1724            Some(0.25),
1725            "fresh connection should ship with the SQLite-parity default"
1726        );
1727
1728        conn.set_auto_vacuum_threshold(None).unwrap();
1729        assert_eq!(conn.auto_vacuum_threshold(), None);
1730
1731        conn.set_auto_vacuum_threshold(Some(0.5)).unwrap();
1732        assert_eq!(conn.auto_vacuum_threshold(), Some(0.5));
1733
1734        // Out-of-range values must be rejected with a typed error and
1735        // must not stomp the previously-set value.
1736        let err = conn.set_auto_vacuum_threshold(Some(1.5)).unwrap_err();
1737        assert!(
1738            format!("{err}").contains("auto_vacuum_threshold"),
1739            "expected typed range error, got: {err}"
1740        );
1741        assert_eq!(
1742            conn.auto_vacuum_threshold(),
1743            Some(0.5),
1744            "rejected setter call must not mutate the threshold"
1745        );
1746    }
1747
1748    #[test]
1749    fn index_out_of_bounds_errors_cleanly() {
1750        let mut conn = Connection::open_in_memory().unwrap();
1751        conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY);")
1752            .unwrap();
1753        conn.execute("INSERT INTO t (a) VALUES (1);").unwrap();
1754        let stmt = conn.prepare("SELECT a FROM t;").unwrap();
1755        let mut rows = stmt.query().unwrap();
1756        let row = rows.next().unwrap().unwrap();
1757        let err = row.get::<i64>(99).unwrap_err();
1758        assert!(format!("{err}").contains("out of bounds"));
1759    }
1760
1761    // -----------------------------------------------------------------
1762    // SQLR-23 — prepared-statement plan cache + parameter binding
1763    // -----------------------------------------------------------------
1764
1765    #[test]
1766    fn parameter_count_reflects_question_marks() {
1767        let mut conn = Connection::open_in_memory().unwrap();
1768        conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1769        let stmt = conn.prepare("SELECT a, b FROM t WHERE a = ?").unwrap();
1770        assert_eq!(stmt.parameter_count(), 1);
1771        let stmt = conn
1772            .prepare("SELECT a, b FROM t WHERE a = ? AND b = ?")
1773            .unwrap();
1774        assert_eq!(stmt.parameter_count(), 2);
1775        let stmt = conn.prepare("SELECT a FROM t").unwrap();
1776        assert_eq!(stmt.parameter_count(), 0);
1777    }
1778
1779    #[test]
1780    fn query_with_params_binds_scalars() {
1781        let mut conn = Connection::open_in_memory().unwrap();
1782        conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
1783            .unwrap();
1784        conn.execute("INSERT INTO t (a, b) VALUES (1, 'alice');")
1785            .unwrap();
1786        conn.execute("INSERT INTO t (a, b) VALUES (2, 'bob');")
1787            .unwrap();
1788        conn.execute("INSERT INTO t (a, b) VALUES (3, 'carol');")
1789            .unwrap();
1790
1791        let stmt = conn.prepare("SELECT b FROM t WHERE a = ?").unwrap();
1792        let rows = stmt
1793            .query_with_params(&[Value::Integer(2)])
1794            .unwrap()
1795            .collect_all()
1796            .unwrap();
1797        assert_eq!(rows.len(), 1);
1798        assert_eq!(rows[0].get::<String>(0).unwrap(), "bob");
1799    }
1800
1801    #[test]
1802    fn execute_with_params_binds_insert_values() {
1803        let mut conn = Connection::open_in_memory().unwrap();
1804        conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1805
1806        let mut stmt = conn.prepare("INSERT INTO t (a, b) VALUES (?, ?)").unwrap();
1807        stmt.execute_with_params(&[Value::Integer(7), Value::Text("hi".into())])
1808            .unwrap();
1809        stmt.execute_with_params(&[Value::Integer(8), Value::Text("yo".into())])
1810            .unwrap();
1811
1812        let stmt = conn.prepare("SELECT a, b FROM t").unwrap();
1813        let rows = stmt.query().unwrap().collect_all().unwrap();
1814        assert_eq!(rows.len(), 2);
1815        assert!(
1816            rows.iter()
1817                .any(|r| r.get::<i64>(0).unwrap() == 7 && r.get::<String>(1).unwrap() == "hi")
1818        );
1819        assert!(
1820            rows.iter()
1821                .any(|r| r.get::<i64>(0).unwrap() == 8 && r.get::<String>(1).unwrap() == "yo")
1822        );
1823    }
1824
1825    #[test]
1826    fn arity_mismatch_returns_clean_error() {
1827        let mut conn = Connection::open_in_memory().unwrap();
1828        conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1829        let stmt = conn
1830            .prepare("SELECT * FROM t WHERE a = ? AND b = ?")
1831            .unwrap();
1832        let err = stmt.query_with_params(&[Value::Integer(1)]).unwrap_err();
1833        assert!(format!("{err}").contains("expected 2 parameter"));
1834    }
1835
1836    #[test]
1837    fn run_and_query_reject_when_placeholders_present() {
1838        let mut conn = Connection::open_in_memory().unwrap();
1839        conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1840        let mut stmt_select = conn.prepare("SELECT a FROM t WHERE a = ?").unwrap();
1841        let err = stmt_select.query().unwrap_err();
1842        assert!(format!("{err}").contains("query_with_params"));
1843        let err = stmt_select.run().unwrap_err();
1844        assert!(format!("{err}").contains("execute_with_params"));
1845    }
1846
1847    #[test]
1848    fn null_param_compares_against_null() {
1849        // a = NULL is *false* in SQL three-valued logic; binding NULL
1850        // must match SQLite's behavior so callers can rely on the same
1851        // semantics.
1852        let mut conn = Connection::open_in_memory().unwrap();
1853        conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1854        conn.execute("INSERT INTO t (a) VALUES (1);").unwrap();
1855        let stmt = conn.prepare("SELECT a FROM t WHERE a = ?").unwrap();
1856        let rows = stmt
1857            .query_with_params(&[Value::Null])
1858            .unwrap()
1859            .collect_all()
1860            .unwrap();
1861        assert_eq!(rows.len(), 0);
1862    }
1863
1864    #[test]
1865    fn vector_param_substitutes_through_select() {
1866        // Non-HNSW path: a small VECTOR table + brute-force ORDER BY
1867        // exercises the substitution into the ORDER BY expression
1868        // and the bracket-array shape eval_expr_scope expects.
1869        let mut conn = Connection::open_in_memory().unwrap();
1870        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(3));")
1871            .unwrap();
1872        conn.execute("INSERT INTO v (id, e) VALUES (1, [1.0, 0.0, 0.0]);")
1873            .unwrap();
1874        conn.execute("INSERT INTO v (id, e) VALUES (2, [0.0, 1.0, 0.0]);")
1875            .unwrap();
1876        conn.execute("INSERT INTO v (id, e) VALUES (3, [0.0, 0.0, 1.0]);")
1877            .unwrap();
1878
1879        let stmt = conn
1880            .prepare("SELECT id FROM v ORDER BY vec_distance_l2(e, ?) ASC LIMIT 1")
1881            .unwrap();
1882        let rows = stmt
1883            .query_with_params(&[Value::Vector(vec![1.0, 0.0, 0.0])])
1884            .unwrap()
1885            .collect_all()
1886            .unwrap();
1887        assert_eq!(rows.len(), 1);
1888        assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1889    }
1890
1891    #[test]
1892    fn prepare_cached_reuses_plans() {
1893        let mut conn = Connection::open_in_memory().unwrap();
1894        conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1895        for n in 1..=3 {
1896            conn.execute(&format!("INSERT INTO t (a) VALUES ({n});"))
1897                .unwrap();
1898        }
1899
1900        // First call populates the cache; second hits the same entry.
1901        let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1902        let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1903        assert_eq!(conn.prepared_cache_len(), 1);
1904
1905        // Distinct SQL widens the cache.
1906        let _ = conn.prepare_cached("SELECT a FROM t").unwrap();
1907        assert_eq!(conn.prepared_cache_len(), 2);
1908    }
1909
1910    #[test]
1911    fn prepare_cached_evicts_when_over_capacity() {
1912        let mut conn = Connection::open_in_memory().unwrap();
1913        conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1914        conn.set_prepared_cache_capacity(2);
1915        let _ = conn.prepare_cached("SELECT a FROM t").unwrap();
1916        let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1917        assert_eq!(conn.prepared_cache_len(), 2);
1918        // Third distinct SQL evicts the oldest entry (the FROM-only SELECT).
1919        let _ = conn.prepare_cached("SELECT a FROM t WHERE a > ?").unwrap();
1920        assert_eq!(conn.prepared_cache_len(), 2);
1921    }
1922
1923    /// SQLR-23 — the headline VECTOR-binding case. With an HNSW index
1924    /// attached, the optimizer hook recognizes
1925    /// `ORDER BY vec_distance_l2(col, ?) LIMIT k` even when the second
1926    /// arg is a bound parameter, because substitution lowers
1927    /// `Value::Vector` into the same bracket-array shape an inline
1928    /// `[…]` literal produces. Self-query: querying for one of the
1929    /// corpus's own vectors must return that vector as the nearest.
1930    #[test]
1931    fn vector_bind_through_hnsw_optimizer() {
1932        let mut conn = Connection::open_in_memory().unwrap();
1933        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(4));")
1934            .unwrap();
1935        let corpus: [(i64, [f32; 4]); 5] = [
1936            (1, [1.0, 0.0, 0.0, 0.0]),
1937            (2, [0.0, 1.0, 0.0, 0.0]),
1938            (3, [0.0, 0.0, 1.0, 0.0]),
1939            (4, [0.0, 0.0, 0.0, 1.0]),
1940            (5, [0.5, 0.5, 0.5, 0.5]),
1941        ];
1942        for (id, vec) in corpus {
1943            conn.execute(&format!(
1944                "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}, {}]);",
1945                vec[0], vec[1], vec[2], vec[3]
1946            ))
1947            .unwrap();
1948        }
1949        conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e);")
1950            .unwrap();
1951
1952        let stmt = conn
1953            .prepare("SELECT id FROM v ORDER BY vec_distance_l2(e, ?) ASC LIMIT 1")
1954            .unwrap();
1955        // Query with id=3's vector — expect id=3 back.
1956        let rows = stmt
1957            .query_with_params(&[Value::Vector(vec![0.0, 0.0, 1.0, 0.0])])
1958            .unwrap()
1959            .collect_all()
1960            .unwrap();
1961        assert_eq!(rows.len(), 1);
1962        assert_eq!(rows[0].get::<i64>(0).unwrap(), 3);
1963
1964        // Query with id=1's vector — expect id=1.
1965        let rows = stmt
1966            .query_with_params(&[Value::Vector(vec![1.0, 0.0, 0.0, 0.0])])
1967            .unwrap()
1968            .collect_all()
1969            .unwrap();
1970        assert_eq!(rows.len(), 1);
1971        assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1972    }
1973
1974    /// SQLR-28 — cosine probe: an HNSW index built `WITH (metric =
1975    /// 'cosine')` must serve `ORDER BY vec_distance_cosine(col, [...])`
1976    /// from the graph. Self-query: querying for one of the corpus's
1977    /// own vectors must come back as the nearest under cosine
1978    /// distance.
1979    #[test]
1980    fn cosine_self_query_through_hnsw_optimizer() {
1981        let mut conn = Connection::open_in_memory().unwrap();
1982        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(4));")
1983            .unwrap();
1984        let corpus: [(i64, [f32; 4]); 5] = [
1985            (1, [1.0, 0.0, 0.0, 0.0]),
1986            (2, [0.0, 1.0, 0.0, 0.0]),
1987            (3, [0.0, 0.0, 1.0, 0.0]),
1988            (4, [0.0, 0.0, 0.0, 1.0]),
1989            (5, [0.5, 0.5, 0.5, 0.5]),
1990        ];
1991        for (id, vec) in corpus {
1992            conn.execute(&format!(
1993                "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}, {}]);",
1994                vec[0], vec[1], vec[2], vec[3]
1995            ))
1996            .unwrap();
1997        }
1998        conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e) WITH (metric = 'cosine');")
1999            .unwrap();
2000
2001        // Self-query for id=2's vector — expected nearest under cosine
2002        // distance is id=2 itself (cos distance 0).
2003        let rows = conn
2004            .prepare("SELECT id FROM v ORDER BY vec_distance_cosine(e, [0.0, 1.0, 0.0, 0.0]) ASC LIMIT 1")
2005            .unwrap()
2006            .query_with_params(&[])
2007            .unwrap()
2008            .collect_all()
2009            .unwrap();
2010        assert_eq!(rows.len(), 1);
2011        assert_eq!(rows[0].get::<i64>(0).unwrap(), 2);
2012    }
2013
2014    /// SQLR-28 — dot probe: same shape as the cosine test, but the
2015    /// index is built `WITH (metric = 'dot')` and the query uses
2016    /// `vec_distance_dot`. Confirms the third metric variant lights up
2017    /// the graph shortcut, not just l2 / cosine.
2018    #[test]
2019    fn dot_self_query_through_hnsw_optimizer() {
2020        let mut conn = Connection::open_in_memory().unwrap();
2021        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(3));")
2022            .unwrap();
2023        // Data: distinguishable magnitudes so the dot metric resolves
2024        // a clear winner. `vec_distance_dot(a, b) = -(a·b)` — smaller
2025        // (more negative) is closer.
2026        let corpus: [(i64, [f32; 3]); 4] = [
2027            (1, [1.0, 0.0, 0.0]),
2028            (2, [2.0, 0.0, 0.0]),
2029            (3, [0.0, 1.0, 0.0]),
2030            (4, [0.0, 0.0, 1.0]),
2031        ];
2032        for (id, vec) in corpus {
2033            conn.execute(&format!(
2034                "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}]);",
2035                vec[0], vec[1], vec[2]
2036            ))
2037            .unwrap();
2038        }
2039        conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e) WITH (metric = 'dot');")
2040            .unwrap();
2041
2042        // Query [3, 0, 0]: dot products are 3, 6, 0, 0 → distances
2043        // -3, -6, 0, 0. id=2 has the smallest (most negative) distance.
2044        let rows = conn
2045            .prepare("SELECT id FROM v ORDER BY vec_distance_dot(e, [3.0, 0.0, 0.0]) ASC LIMIT 1")
2046            .unwrap()
2047            .query_with_params(&[])
2048            .unwrap()
2049            .collect_all()
2050            .unwrap();
2051        assert_eq!(rows.len(), 1);
2052        assert_eq!(rows[0].get::<i64>(0).unwrap(), 2);
2053    }
2054
2055    /// SQLR-28 — metric mismatch must NOT take the graph shortcut.
2056    /// An L2-built index queried with `vec_distance_cosine` falls
2057    /// through to brute-force, which still returns the correct
2058    /// answer. We confirm the answer is correct; the slow-path
2059    /// behaviour itself is implicit (no error, no panic, no wrong
2060    /// result), which is the user-visible contract that matters.
2061    #[test]
2062    fn metric_mismatch_falls_back_to_brute_force() {
2063        let mut conn = Connection::open_in_memory().unwrap();
2064        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(2));")
2065            .unwrap();
2066        let half_sqrt2 = std::f32::consts::FRAC_1_SQRT_2;
2067        let corpus: [(i64, [f32; 2]); 3] = [
2068            (1, [1.0, 0.0]),
2069            (2, [half_sqrt2, half_sqrt2]),
2070            (3, [0.0, 1.0]),
2071        ];
2072        for (id, vec) in corpus {
2073            conn.execute(&format!(
2074                "INSERT INTO v (id, e) VALUES ({id}, [{}, {}]);",
2075                vec[0], vec[1]
2076            ))
2077            .unwrap();
2078        }
2079        // Default L2 index — no WITH clause.
2080        conn.execute("CREATE INDEX v_hnsw_l2 ON v USING hnsw (e);")
2081            .unwrap();
2082
2083        // Query with cosine. Index can't help; brute-force still
2084        // returns the correct nearest by cosine: id=1 (cos dist 0).
2085        let rows = conn
2086            .prepare("SELECT id FROM v ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1")
2087            .unwrap()
2088            .query_with_params(&[])
2089            .unwrap()
2090            .collect_all()
2091            .unwrap();
2092        assert_eq!(rows.len(), 1);
2093        assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
2094    }
2095
2096    /// SQLR-28 — a typo in the metric name must error at CREATE INDEX
2097    /// time. Falling back to L2 silently is the bug we're fixing here,
2098    /// not the behaviour to preserve.
2099    #[test]
2100    fn unknown_metric_name_is_rejected() {
2101        let mut conn = Connection::open_in_memory().unwrap();
2102        conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(2));")
2103            .unwrap();
2104        let err = conn
2105            .execute("CREATE INDEX bad ON v USING hnsw (e) WITH (metric = 'cosin');")
2106            .unwrap_err();
2107        let msg = format!("{err}");
2108        assert!(msg.contains("unknown HNSW metric"), "got: {msg}");
2109    }
2110
2111    /// SQLR-28 — WITH options on a non-HNSW index must error rather
2112    /// than be silently ignored. An option that has no effect on the
2113    /// resulting index is a footgun.
2114    #[test]
2115    fn with_metric_on_btree_is_rejected() {
2116        let mut conn = Connection::open_in_memory().unwrap();
2117        conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
2118            .unwrap();
2119        let err = conn
2120            .execute("CREATE INDEX bad ON t (b) WITH (metric = 'cosine');")
2121            .unwrap_err();
2122        let msg = format!("{err}");
2123        assert!(msg.contains("doesn't support any options"), "got: {msg}");
2124    }
2125
2126    // -----------------------------------------------------------------
2127    // Phase 10.1 — multi-connection foundation
2128    // -----------------------------------------------------------------
2129
2130    /// `connect()` mints a sibling handle that shares the backing
2131    /// `Database`. Writes through one are visible through the other —
2132    /// the headline behavioural change for Phase 10.1.
2133    #[test]
2134    fn connect_shares_underlying_database() {
2135        let mut a = Connection::open_in_memory().unwrap();
2136        let mut b = a.connect();
2137        assert_eq!(a.handle_count(), 2);
2138
2139        a.execute("CREATE TABLE shared (id INTEGER PRIMARY KEY, label TEXT);")
2140            .unwrap();
2141        a.execute("INSERT INTO shared (label) VALUES ('via-a');")
2142            .unwrap();
2143        b.execute("INSERT INTO shared (label) VALUES ('via-b');")
2144            .unwrap();
2145
2146        let stmt = b.prepare("SELECT label FROM shared;").unwrap();
2147        let mut labels: Vec<String> = stmt
2148            .query()
2149            .unwrap()
2150            .collect_all()
2151            .unwrap()
2152            .into_iter()
2153            .map(|r| r.get::<String>(0).unwrap())
2154            .collect();
2155        labels.sort();
2156        assert_eq!(labels, vec!["via-a".to_string(), "via-b".to_string()]);
2157    }
2158
2159    /// Dropping a sibling decrements the handle count without
2160    /// disturbing the surviving connections.
2161    #[test]
2162    fn handle_count_reflects_live_handles() {
2163        let primary = Connection::open_in_memory().unwrap();
2164        assert_eq!(primary.handle_count(), 1);
2165        let s1 = primary.connect();
2166        let s2 = primary.connect();
2167        assert_eq!(primary.handle_count(), 3);
2168        drop(s1);
2169        assert_eq!(primary.handle_count(), 2);
2170        drop(s2);
2171        assert_eq!(primary.handle_count(), 1);
2172    }
2173
2174    /// Multi-thread INSERT/COMMIT against the same in-memory DB. Today
2175    /// the per-`Database` mutex serializes commits — this test proves
2176    /// the locking holds without panics or data loss when N threads
2177    /// race for the writer. Phase 10.4's `BEGIN CONCURRENT` will lift
2178    /// the serialization for disjoint-row workloads; until then the
2179    /// guarantee is "no panic, every commit lands."
2180    #[test]
2181    fn threaded_writers_serialize_cleanly() {
2182        use std::thread;
2183
2184        let primary = Connection::open_in_memory().unwrap();
2185        // Set up the shared schema before spawning so every worker
2186        // sees the table.
2187        {
2188            let mut p = primary.connect();
2189            p.execute("CREATE TABLE log (id INTEGER PRIMARY KEY, who TEXT, n INTEGER);")
2190                .unwrap();
2191        }
2192
2193        const THREADS: usize = 8;
2194        const PER_THREAD: usize = 25;
2195
2196        let handles: Vec<_> = (0..THREADS)
2197            .map(|tid| {
2198                let mut conn = primary.connect();
2199                thread::spawn(move || {
2200                    for n in 0..PER_THREAD {
2201                        let sql = format!("INSERT INTO log (who, n) VALUES ('t{tid}', {n});");
2202                        conn.execute(&sql).expect("insert under contention");
2203                    }
2204                })
2205            })
2206            .collect();
2207
2208        for h in handles {
2209            h.join().expect("worker panicked");
2210        }
2211
2212        // Every write must have landed exactly once — count rows by
2213        // probing the table directly so we don't depend on a SELECT
2214        // COUNT(*) implementation.
2215        let db = primary.database();
2216        let table = db.get_table("log".to_string()).unwrap();
2217        assert_eq!(
2218            table.rowids().len(),
2219            THREADS * PER_THREAD,
2220            "expected every threaded INSERT to commit",
2221        );
2222    }
2223
2224    /// `connect()` over a file-backed database produces sibling
2225    /// handles that hit the same on-disk pager. Auto-save through one
2226    /// must be visible through the other without a re-open.
2227    #[test]
2228    fn connect_shares_file_backed_database() {
2229        let path = tmp_path("connect_file");
2230        let mut primary = Connection::open(&path).unwrap();
2231        primary
2232            .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);")
2233            .unwrap();
2234
2235        let mut sibling = primary.connect();
2236        sibling.execute("INSERT INTO t (v) VALUES ('hi');").unwrap();
2237
2238        let stmt = primary.prepare("SELECT v FROM t;").unwrap();
2239        let rows = stmt.query().unwrap().collect_all().unwrap();
2240        assert_eq!(rows.len(), 1);
2241        assert_eq!(rows[0].get::<String>(0).unwrap(), "hi");
2242
2243        drop(sibling);
2244        drop(primary);
2245        cleanup(&path);
2246    }
2247
2248    /// Prepared-statement caches are per-handle, by design — sharing
2249    /// a mutable LRU across threads would require an extra lock for
2250    /// no real win (each worker prepares its own hot SQL).
2251    #[test]
2252    fn prep_cache_is_per_handle() {
2253        let mut a = Connection::open_in_memory().unwrap();
2254        a.execute("CREATE TABLE t (a INTEGER);").unwrap();
2255        let mut b = a.connect();
2256
2257        let _ = a.prepare_cached("SELECT a FROM t").unwrap();
2258        let _ = a.prepare_cached("SELECT a FROM t").unwrap();
2259        assert_eq!(a.prepared_cache_len(), 1);
2260        // The sibling's cache is untouched.
2261        assert_eq!(b.prepared_cache_len(), 0);
2262        let _ = b.prepare_cached("SELECT a FROM t").unwrap();
2263        assert_eq!(b.prepared_cache_len(), 1);
2264    }
2265
2266    /// Static check: `Connection` is `Send + Sync`. Required so it can
2267    /// be moved across threads (or wrapped in `Arc`) without a typestate
2268    /// adapter — the headline contract Phase 10.1 puts in place.
2269    #[test]
2270    fn connection_is_send_and_sync() {
2271        fn assert_send<T: Send>() {}
2272        fn assert_sync<T: Sync>() {}
2273        assert_send::<Connection>();
2274        assert_sync::<Connection>();
2275    }
2276
2277    // -----------------------------------------------------------------
2278    // Phase 11.3 — `PRAGMA journal_mode` round-trip
2279    // -----------------------------------------------------------------
2280
2281    /// Fresh connections default to `wal` mode. The PRAGMA read form
2282    /// renders the current value as a single-row, single-column table
2283    /// the REPL can print.
2284    #[test]
2285    fn journal_mode_defaults_to_wal_and_renders_through_pragma() {
2286        let mut conn = Connection::open_in_memory().unwrap();
2287        assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2288
2289        // Read form returns "1 row returned." status (matching
2290        // `auto_vacuum`'s shape).
2291        let status = conn.execute("PRAGMA journal_mode;").unwrap();
2292        assert!(
2293            status.contains("1 row returned"),
2294            "unexpected status: {status}"
2295        );
2296    }
2297
2298    /// `PRAGMA journal_mode = mvcc;` flips the per-database mode and
2299    /// is observable through every sibling handle. The headline
2300    /// per-database contract for Phase 11.3.
2301    #[test]
2302    fn journal_mode_set_to_mvcc_propagates_to_siblings() {
2303        let mut primary = Connection::open_in_memory().unwrap();
2304        let sibling = primary.connect();
2305        assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal);
2306
2307        primary.execute("PRAGMA journal_mode = mvcc;").unwrap();
2308        assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2309        // Sibling sees the same value — proves the setting lives on
2310        // the shared `Database`, not on the per-handle Connection.
2311        assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2312
2313        // Switch back is allowed because no MVCC versions exist yet
2314        // (11.4 will populate the store).
2315        primary.execute("PRAGMA journal_mode = wal;").unwrap();
2316        assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Wal);
2317        assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal);
2318    }
2319
2320    /// The set form is case-insensitive on both the pragma name and
2321    /// the value (matching SQLite). Quoted values work too.
2322    #[test]
2323    fn journal_mode_pragma_is_case_insensitive() {
2324        let mut conn = Connection::open_in_memory().unwrap();
2325        conn.execute("PRAGMA JOURNAL_MODE = MVCC;").unwrap();
2326        assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2327        conn.execute("pragma journal_mode = 'wal';").unwrap();
2328        assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2329    }
2330
2331    /// Unknown modes return a typed error and don't disturb the
2332    /// existing setting.
2333    #[test]
2334    fn journal_mode_rejects_unknown_value() {
2335        let mut conn = Connection::open_in_memory().unwrap();
2336        let err = conn
2337            .execute("PRAGMA journal_mode = delete;")
2338            .expect_err("unknown mode must error");
2339        let msg = format!("{err}");
2340        assert!(
2341            msg.contains("unknown mode 'delete'"),
2342            "unexpected error: {msg}"
2343        );
2344        // Setting wasn't disturbed.
2345        assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2346    }
2347
2348    /// Numeric values are rejected — `journal_mode` is enum-shaped.
2349    /// SQLite accepts e.g. `journal_mode = 0` for OFF historically;
2350    /// SQLRite stays explicit.
2351    #[test]
2352    fn journal_mode_rejects_numeric_value() {
2353        let mut conn = Connection::open_in_memory().unwrap();
2354        let err = conn
2355            .execute("PRAGMA journal_mode = 0;")
2356            .expect_err("numeric mode must error");
2357        let msg = format!("{err}");
2358        assert!(msg.contains("numeric"), "unexpected error: {msg}");
2359    }
2360
2361    // -----------------------------------------------------------------
2362    // Phase 11.4 — `BEGIN CONCURRENT` end-to-end
2363    // -----------------------------------------------------------------
2364
2365    /// `BEGIN CONCURRENT` requires `PRAGMA journal_mode = mvcc;`
2366    /// first. v0 doesn't auto-enable MVCC mode; users opt in
2367    /// explicitly so the implications (in-memory MvStore growth,
2368    /// `Busy` errors becoming possible) aren't a surprise.
2369    #[test]
2370    fn begin_concurrent_requires_mvcc_journal_mode() {
2371        let mut conn = Connection::open_in_memory().unwrap();
2372        let err = conn
2373            .execute("BEGIN CONCURRENT;")
2374            .expect_err("must require MVCC journal mode");
2375        let msg = format!("{err}");
2376        assert!(
2377            msg.contains("PRAGMA journal_mode = mvcc"),
2378            "unexpected error: {msg}"
2379        );
2380    }
2381
2382    /// Round-trip: enable MVCC, BEGIN CONCURRENT, no writes,
2383    /// COMMIT. The simplest control-flow check — proves the
2384    /// parser-intent + lifecycle hooks all line up.
2385    #[test]
2386    fn begin_concurrent_then_empty_commit_round_trips() {
2387        let mut conn = Connection::open_in_memory().unwrap();
2388        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2389        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2390            .unwrap();
2391        let begin_status = conn.execute("BEGIN CONCURRENT;").unwrap();
2392        assert_eq!(begin_status, "BEGIN");
2393        let commit_status = conn.execute("COMMIT;").unwrap();
2394        assert_eq!(commit_status, "COMMIT");
2395    }
2396
2397    /// Plan test #1: two concurrent transactions on **disjoint
2398    /// rowids** must both commit. No write-write conflict to
2399    /// detect; validation passes for both.
2400    #[test]
2401    fn two_concurrent_inserts_on_disjoint_rows_both_commit() {
2402        let mut a = Connection::open_in_memory().unwrap();
2403        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2404        a.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER);")
2405            .unwrap();
2406        let mut b = a.connect();
2407
2408        a.execute("BEGIN CONCURRENT;").unwrap();
2409        a.execute("INSERT INTO accounts (id, balance) VALUES (1, 100);")
2410            .unwrap();
2411
2412        b.execute("BEGIN CONCURRENT;").unwrap();
2413        b.execute("INSERT INTO accounts (id, balance) VALUES (2, 200);")
2414            .unwrap();
2415
2416        // Both commit cleanly — disjoint rowids, no conflict.
2417        a.execute("COMMIT;").unwrap();
2418        b.execute("COMMIT;").unwrap();
2419
2420        // Both rows are visible through the legacy read path.
2421        let stmt = a.prepare("SELECT id, balance FROM accounts;").unwrap();
2422        let mut rows: Vec<(i64, i64)> = stmt
2423            .query()
2424            .unwrap()
2425            .collect_all()
2426            .unwrap()
2427            .into_iter()
2428            .map(|r| (r.get::<i64>(0).unwrap(), r.get::<i64>(1).unwrap()))
2429            .collect();
2430        rows.sort();
2431        assert_eq!(rows, vec![(1, 100), (2, 200)]);
2432    }
2433
2434    /// Plan test #2: two concurrent transactions on the **same
2435    /// row** — one commits, the other aborts with `Busy`.
2436    #[test]
2437    fn two_concurrent_updates_same_row_one_aborts_with_busy() {
2438        let mut a = Connection::open_in_memory().unwrap();
2439        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2440        a.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER);")
2441            .unwrap();
2442        a.execute("INSERT INTO accounts (id, balance) VALUES (1, 100);")
2443            .unwrap();
2444        let mut b = a.connect();
2445
2446        // Both BEGIN before either UPDATE — that's the snapshot
2447        // the validation checks against.
2448        a.execute("BEGIN CONCURRENT;").unwrap();
2449        b.execute("BEGIN CONCURRENT;").unwrap();
2450
2451        a.execute("UPDATE accounts SET balance = 200 WHERE id = 1;")
2452            .unwrap();
2453        b.execute("UPDATE accounts SET balance = 300 WHERE id = 1;")
2454            .unwrap();
2455
2456        // First commit wins.
2457        a.execute("COMMIT;").unwrap();
2458
2459        // Second commit hits the validation pass and aborts.
2460        let err = b
2461            .execute("COMMIT;")
2462            .expect_err("second commit must abort with Busy");
2463        assert!(matches!(err, SQLRiteError::Busy(_)));
2464        assert!(err.is_retryable(), "Busy must be retryable");
2465        let msg = format!("{err}");
2466        assert!(
2467            msg.contains("write-write conflict"),
2468            "unexpected error: {msg}"
2469        );
2470
2471        // The winning value is what's persisted.
2472        let stmt = a
2473            .prepare("SELECT balance FROM accounts WHERE id = 1;")
2474            .unwrap();
2475        let rows = stmt.query().unwrap().collect_all().unwrap();
2476        assert_eq!(rows.len(), 1);
2477        assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2478    }
2479
2480    /// Plan test #3: an aborted transaction's writes must never
2481    /// become visible. After ROLLBACK (explicit or implicit on
2482    /// Busy), the row keeps its pre-tx value.
2483    #[test]
2484    fn aborted_transactions_writes_never_become_visible() {
2485        let mut conn = Connection::open_in_memory().unwrap();
2486        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2487        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2488            .unwrap();
2489        conn.execute("INSERT INTO t (id, v) VALUES (1, 100);")
2490            .unwrap();
2491
2492        // Explicit ROLLBACK.
2493        conn.execute("BEGIN CONCURRENT;").unwrap();
2494        conn.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2495        conn.execute("ROLLBACK;").unwrap();
2496
2497        let stmt = conn.prepare("SELECT v FROM t WHERE id = 1;").unwrap();
2498        let rows = stmt.query().unwrap().collect_all().unwrap();
2499        assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2500
2501        // Implicit rollback via Busy: another connection commits a
2502        // newer version under us.
2503        let mut other = conn.connect();
2504        conn.execute("BEGIN CONCURRENT;").unwrap();
2505        other.execute("BEGIN CONCURRENT;").unwrap();
2506        conn.execute("UPDATE t SET v = 7 WHERE id = 1;").unwrap();
2507        other.execute("UPDATE t SET v = 13 WHERE id = 1;").unwrap();
2508        conn.execute("COMMIT;").unwrap();
2509        let _ = other.execute("COMMIT;").expect_err("must abort with Busy");
2510
2511        // The losing writer's value (13) never lands. The winner
2512        // (7) is what's visible.
2513        let rows = conn
2514            .prepare("SELECT v FROM t WHERE id = 1;")
2515            .unwrap()
2516            .query()
2517            .unwrap()
2518            .collect_all()
2519            .unwrap();
2520        assert_eq!(rows[0].get::<i64>(0).unwrap(), 7);
2521    }
2522
2523    /// Plan test #4: retry-after-`Busy` succeeds. The caller's
2524    /// retry helper opens a fresh `BEGIN CONCURRENT` (with a
2525    /// new `begin_ts` past the conflict) and the same UPDATE
2526    /// commits cleanly.
2527    #[test]
2528    fn retry_after_busy_succeeds() {
2529        let mut a = Connection::open_in_memory().unwrap();
2530        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2531        a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2532            .unwrap();
2533        a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2534        let mut b = a.connect();
2535
2536        a.execute("BEGIN CONCURRENT;").unwrap();
2537        b.execute("BEGIN CONCURRENT;").unwrap();
2538        a.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
2539        b.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2540        a.execute("COMMIT;").unwrap();
2541        let err = b.execute("COMMIT;").expect_err("first attempt must Busy");
2542        assert!(err.is_retryable());
2543
2544        // Retry: open a fresh tx, redo the same UPDATE, commit.
2545        b.execute("BEGIN CONCURRENT;").unwrap();
2546        b.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2547        b.execute("COMMIT;").expect("retry must succeed");
2548
2549        let rows = a
2550            .prepare("SELECT v FROM t WHERE id = 1;")
2551            .unwrap()
2552            .query()
2553            .unwrap()
2554            .collect_all()
2555            .unwrap();
2556        assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2557    }
2558
2559    /// Nested `BEGIN CONCURRENT` is rejected with a typed error.
2560    /// Same single-tx-per-connection rule the legacy `BEGIN`
2561    /// already enforces.
2562    #[test]
2563    fn nested_begin_concurrent_is_rejected() {
2564        let mut conn = Connection::open_in_memory().unwrap();
2565        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2566        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2567            .unwrap();
2568        conn.execute("BEGIN CONCURRENT;").unwrap();
2569        let err = conn
2570            .execute("BEGIN CONCURRENT;")
2571            .expect_err("nested BEGIN CONCURRENT must error");
2572        assert!(format!("{err}").contains("already open"));
2573    }
2574
2575    /// Legacy `BEGIN` inside `BEGIN CONCURRENT` is rejected.
2576    /// Mixing the two transaction kinds isn't supported in v0;
2577    /// the deep-clone snapshot and the MVCC write-set don't
2578    /// interleave cleanly.
2579    #[test]
2580    fn legacy_begin_inside_concurrent_is_rejected() {
2581        let mut conn = Connection::open_in_memory().unwrap();
2582        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2583        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2584            .unwrap();
2585        conn.execute("BEGIN CONCURRENT;").unwrap();
2586        let err = conn
2587            .execute("BEGIN;")
2588            .expect_err("legacy BEGIN inside concurrent tx must error");
2589        assert!(format!("{err}").contains("concurrent transaction is already open"));
2590    }
2591
2592    /// DDL inside `BEGIN CONCURRENT` is rejected with a typed
2593    /// error. Plan §8 calls this out as an explicit non-goal —
2594    /// schema mutations interact poorly with the snapshot-
2595    /// based commit and the v0 write-set model.
2596    #[test]
2597    fn ddl_inside_begin_concurrent_is_rejected() {
2598        let mut conn = Connection::open_in_memory().unwrap();
2599        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2600        conn.execute("BEGIN CONCURRENT;").unwrap();
2601        let err = conn
2602            .execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2603            .expect_err("DDL inside concurrent tx must error");
2604        let msg = format!("{err}");
2605        assert!(msg.contains("DDL is not supported"), "unexpected: {msg}");
2606        // The transaction stays open — caller can ROLLBACK.
2607        conn.execute("ROLLBACK;").unwrap();
2608    }
2609
2610    /// An empty concurrent commit (BEGIN, no writes, COMMIT)
2611    /// always succeeds — even when other transactions have
2612    /// committed in the meantime, because we have nothing to
2613    /// validate.
2614    #[test]
2615    fn empty_concurrent_commit_never_busies() {
2616        let mut a = Connection::open_in_memory().unwrap();
2617        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2618        a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2619            .unwrap();
2620        a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2621        let mut b = a.connect();
2622
2623        a.execute("BEGIN CONCURRENT;").unwrap();
2624        // Sibling B opens its own concurrent tx and commits a
2625        // change to row 1.
2626        b.execute("BEGIN CONCURRENT;").unwrap();
2627        b.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2628        b.execute("COMMIT;").unwrap();
2629
2630        // a never wrote anything — its commit is purely a
2631        // tx-state cleanup. Validation has no rows to check.
2632        a.execute("COMMIT;")
2633            .expect("empty commit must succeed even if siblings committed");
2634    }
2635
2636    // -----------------------------------------------------------------
2637    // Phase 11.5 — snapshot-isolated reads via Statement::query
2638    // -----------------------------------------------------------------
2639
2640    /// The headline 11.5 contract: a SELECT issued via
2641    /// `prepare(...).query()` inside an open `BEGIN CONCURRENT`
2642    /// sees the BEGIN-time snapshot, not the post-commit live
2643    /// state. Phase 11.4 had this test failing because the
2644    /// prepare/query path bypassed the swap; Phase 11.5 routes
2645    /// it through `with_snapshot_read`.
2646    #[test]
2647    fn query_inside_concurrent_tx_sees_begin_time_snapshot() {
2648        let mut a = Connection::open_in_memory().unwrap();
2649        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2650        a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2651            .unwrap();
2652        a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2653        let mut b = a.connect();
2654
2655        a.execute("BEGIN CONCURRENT;").unwrap();
2656        // Sibling B commits a change to row 1 from another tx.
2657        b.execute("BEGIN CONCURRENT;").unwrap();
2658        b.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2659        b.execute("COMMIT;").unwrap();
2660
2661        // Reader inside a's tx, via prepare()+query(), must see
2662        // the BEGIN-time value (1), not b's committed value (999).
2663        let rows = a
2664            .prepare("SELECT v FROM t WHERE id = 1;")
2665            .unwrap()
2666            .query()
2667            .unwrap()
2668            .collect_all()
2669            .unwrap();
2670        assert_eq!(
2671            rows[0].get::<i64>(0).unwrap(),
2672            1,
2673            "Statement::query inside BEGIN CONCURRENT must see the snapshot, not the live db"
2674        );
2675
2676        // After a's empty commit, the same handle's read sees b's
2677        // value (999) — the swap is gone, the legacy read path is
2678        // back in play.
2679        a.execute("COMMIT;").unwrap();
2680        let rows = a
2681            .prepare("SELECT v FROM t WHERE id = 1;")
2682            .unwrap()
2683            .query()
2684            .unwrap()
2685            .collect_all()
2686            .unwrap();
2687        assert_eq!(rows[0].get::<i64>(0).unwrap(), 999);
2688    }
2689
2690    /// Read-your-writes: an UPDATE inside the tx is visible to
2691    /// the same tx's subsequent SELECT via `query()`. The swap
2692    /// makes the tx's private clone the read target, so writes
2693    /// the executor staged on the clone are reflected.
2694    #[test]
2695    fn query_inside_concurrent_tx_sees_own_writes() {
2696        let mut conn = Connection::open_in_memory().unwrap();
2697        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2698        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2699            .unwrap();
2700        conn.execute("INSERT INTO t (id, v) VALUES (1, 100);")
2701            .unwrap();
2702
2703        conn.execute("BEGIN CONCURRENT;").unwrap();
2704        conn.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2705        // Inside the tx, query() sees v = 200 (our own write).
2706        let rows = conn
2707            .prepare("SELECT v FROM t WHERE id = 1;")
2708            .unwrap()
2709            .query()
2710            .unwrap()
2711            .collect_all()
2712            .unwrap();
2713        assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2714
2715        // After ROLLBACK, the live db still has 100 (the write
2716        // never landed).
2717        conn.execute("ROLLBACK;").unwrap();
2718        let rows = conn
2719            .prepare("SELECT v FROM t WHERE id = 1;")
2720            .unwrap()
2721            .query()
2722            .unwrap()
2723            .collect_all()
2724            .unwrap();
2725        assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2726    }
2727
2728    /// Reads via `query_with_params` (parameter-bound SELECT)
2729    /// also flow through the snapshot. Same path, just with the
2730    /// substitution step in front.
2731    #[test]
2732    fn query_with_params_inside_concurrent_tx_sees_snapshot() {
2733        let mut a = Connection::open_in_memory().unwrap();
2734        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2735        a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2736            .unwrap();
2737        a.execute("INSERT INTO t (id, v) VALUES (1, 7);").unwrap();
2738        let mut b = a.connect();
2739
2740        a.execute("BEGIN CONCURRENT;").unwrap();
2741        b.execute("BEGIN CONCURRENT;").unwrap();
2742        b.execute("UPDATE t SET v = 42 WHERE id = 1;").unwrap();
2743        b.execute("COMMIT;").unwrap();
2744
2745        let rows = a
2746            .prepare("SELECT v FROM t WHERE id = ?")
2747            .unwrap()
2748            .query_with_params(&[Value::Integer(1)])
2749            .unwrap()
2750            .collect_all()
2751            .unwrap();
2752        assert_eq!(rows[0].get::<i64>(0).unwrap(), 7);
2753
2754        a.execute("COMMIT;").unwrap();
2755    }
2756
2757    /// Outside any concurrent tx, `query()` reads the live
2758    /// database. Sanity check that 11.5's snapshot routing is
2759    /// strictly opt-in via `BEGIN CONCURRENT`.
2760    #[test]
2761    fn query_outside_concurrent_tx_sees_live_database() {
2762        let mut a = Connection::open_in_memory().unwrap();
2763        a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2764        a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2765            .unwrap();
2766        a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2767        let mut b = a.connect();
2768
2769        // Sibling commits a change. a is NOT in a tx, so its read
2770        // should see the post-commit value.
2771        b.execute("BEGIN CONCURRENT;").unwrap();
2772        b.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
2773        b.execute("COMMIT;").unwrap();
2774
2775        let rows = a
2776            .prepare("SELECT v FROM t WHERE id = 1;")
2777            .unwrap()
2778            .query()
2779            .unwrap()
2780            .collect_all()
2781            .unwrap();
2782        assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2783    }
2784
2785    /// Sibling reader at the moment a writer commits: the
2786    /// reader's own `BEGIN CONCURRENT` (and its private snapshot)
2787    /// must isolate it from the writer's commit, so the snapshot
2788    /// stays internally consistent for the reader's lifetime.
2789    /// Repeats the read multiple times across the writer's
2790    /// activity to catch any races where the snapshot leaks.
2791    #[test]
2792    fn snapshot_stays_consistent_across_sibling_commits() {
2793        let mut reader = Connection::open_in_memory().unwrap();
2794        reader.execute("PRAGMA journal_mode = mvcc;").unwrap();
2795        reader
2796            .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2797            .unwrap();
2798        reader
2799            .execute("INSERT INTO t (id, v) VALUES (1, 1);")
2800            .unwrap();
2801        let mut writer = reader.connect();
2802
2803        reader.execute("BEGIN CONCURRENT;").unwrap();
2804        // First read inside reader's tx — sees v=1.
2805        let read_at_t0 = reader
2806            .prepare("SELECT v FROM t WHERE id = 1;")
2807            .unwrap()
2808            .query()
2809            .unwrap()
2810            .collect_all()
2811            .unwrap();
2812        assert_eq!(read_at_t0[0].get::<i64>(0).unwrap(), 1);
2813
2814        // Writer commits a stream of changes between reader's
2815        // reads. Each commit advances the live db and adds a
2816        // version to MvStore.
2817        for new_value in [10, 20, 30, 40] {
2818            writer.execute("BEGIN CONCURRENT;").unwrap();
2819            writer
2820                .execute(&format!("UPDATE t SET v = {new_value} WHERE id = 1;"))
2821                .unwrap();
2822            writer.execute("COMMIT;").unwrap();
2823
2824            // Reader's snapshot must still see v=1.
2825            let r = reader
2826                .prepare("SELECT v FROM t WHERE id = 1;")
2827                .unwrap()
2828                .query()
2829                .unwrap()
2830                .collect_all()
2831                .unwrap();
2832            assert_eq!(
2833                r[0].get::<i64>(0).unwrap(),
2834                1,
2835                "snapshot regressed after writer committed v={new_value}",
2836            );
2837        }
2838
2839        reader.execute("COMMIT;").unwrap();
2840    }
2841
2842    // -----------------------------------------------------------------
2843    // Phase 11.6 — MVCC garbage collection
2844    // -----------------------------------------------------------------
2845
2846    /// Per-commit GC bounds the chain length under repeated
2847    /// updates to the same row when no readers are holding a
2848    /// snapshot that would need older versions. After many
2849    /// updates the store should hold roughly one version per row,
2850    /// not a version per commit.
2851    #[test]
2852    fn repeated_updates_keep_chain_bounded_when_no_readers() {
2853        let mut conn = Connection::open_in_memory().unwrap();
2854        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2855        conn.execute("CREATE TABLE counters (id INTEGER PRIMARY KEY, n INTEGER);")
2856            .unwrap();
2857        conn.execute("INSERT INTO counters (id, n) VALUES (1, 0);")
2858            .unwrap();
2859
2860        // 50 sequential updates inside their own concurrent
2861        // transactions. With no overlapping readers, the
2862        // per-commit GC sweep should reclaim every superseded
2863        // version and leave only the latest.
2864        for n in 1..=50 {
2865            conn.execute("BEGIN CONCURRENT;").unwrap();
2866            conn.execute(&format!("UPDATE counters SET n = {n} WHERE id = 1;"))
2867                .unwrap();
2868            conn.execute("COMMIT;").unwrap();
2869        }
2870
2871        // MvStore should now hold exactly one version for the
2872        // row we hammered (the latest). Without GC it would hold
2873        // 50.
2874        let db = conn.database();
2875        let store_size = db.mv_store().total_versions();
2876        let tracked = db.mv_store().tracked_rows();
2877        drop(db);
2878        assert_eq!(
2879            store_size, 1,
2880            "expected 1 version after 50 GC'd updates, got {store_size}",
2881        );
2882        assert_eq!(tracked, 1);
2883    }
2884
2885    /// GC must NOT reclaim versions that an in-flight reader's
2886    /// snapshot might still see. While a reader holds an open
2887    /// `BEGIN CONCURRENT` at `begin_ts = T`, every version with
2888    /// `end > T` must remain in the chain.
2889    #[test]
2890    fn gc_preserves_versions_visible_to_active_reader() {
2891        let mut writer = Connection::open_in_memory().unwrap();
2892        writer.execute("PRAGMA journal_mode = mvcc;").unwrap();
2893        writer
2894            .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2895            .unwrap();
2896        writer
2897            .execute("INSERT INTO t (id, v) VALUES (1, 0);")
2898            .unwrap();
2899        let mut reader = writer.connect();
2900
2901        // Reader opens its tx FIRST so its snapshot sits at the
2902        // smallest `begin_ts` across the active set.
2903        reader.execute("BEGIN CONCURRENT;").unwrap();
2904
2905        // Writer commits five updates; per-commit GC fires after
2906        // each, but the reader's begin_ts pins the watermark so
2907        // the older versions can't be reclaimed.
2908        for n in 1..=5 {
2909            writer.execute("BEGIN CONCURRENT;").unwrap();
2910            writer
2911                .execute(&format!("UPDATE t SET v = {n} WHERE id = 1;"))
2912                .unwrap();
2913            writer.execute("COMMIT;").unwrap();
2914        }
2915
2916        // Reader's snapshot still sees v=0 — the chain must have
2917        // retained the original version (or a tombstone-capped
2918        // earlier value) so the visibility rule resolves it.
2919        let rows = reader
2920            .prepare("SELECT v FROM t WHERE id = 1;")
2921            .unwrap()
2922            .query()
2923            .unwrap()
2924            .collect_all()
2925            .unwrap();
2926        assert_eq!(rows[0].get::<i64>(0).unwrap(), 0);
2927
2928        // The reader's snapshot is preserved by GC's watermark.
2929        // No assertion on the exact chain length — that's an
2930        // implementation detail; the property is "reader sees
2931        // v=0 even after writer's burst."
2932
2933        reader.execute("COMMIT;").unwrap();
2934
2935        // After the reader closes, the watermark jumps and an
2936        // explicit vacuum reclaims everything reclaimable.
2937        // (We skip checking the exact reclaim count because the
2938        // post-reader-close state of the chain depends on the
2939        // ordering of the reader's `drop` and the watermark
2940        // sample inside `vacuum_mvcc` — both are correct, just
2941        // different.)
2942        writer.vacuum_mvcc();
2943        let db = writer.database();
2944        let store_size = db.mv_store().total_versions();
2945        drop(db);
2946        // At most one version per row (the latest committed).
2947        assert!(
2948            store_size <= 1,
2949            "after reader closed and vacuum ran, expected ≤1 version, got {store_size}",
2950        );
2951    }
2952
2953    /// `Connection::vacuum_mvcc` is a no-op on a fresh
2954    /// `JournalMode::Wal` database: the store is empty, nothing
2955    /// to reclaim. Matches the "safe to call regardless of
2956    /// journal mode" contract.
2957    #[test]
2958    fn vacuum_mvcc_is_a_noop_on_wal_database() {
2959        let conn = Connection::open_in_memory().unwrap();
2960        // Default journal mode is Wal; never enabled MVCC.
2961        assert_eq!(conn.vacuum_mvcc(), 0);
2962    }
2963
2964    /// Explicit `vacuum_mvcc` reclaims everything reclaimable
2965    /// when no transactions are active.
2966    #[test]
2967    fn vacuum_mvcc_reclaims_everything_with_no_active_readers() {
2968        let mut conn = Connection::open_in_memory().unwrap();
2969        conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2970        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2971            .unwrap();
2972
2973        // Build up some versions.
2974        conn.execute("INSERT INTO t (id, v) VALUES (1, 0);")
2975            .unwrap();
2976        conn.execute("BEGIN CONCURRENT;").unwrap();
2977        conn.execute("UPDATE t SET v = 1 WHERE id = 1;").unwrap();
2978        conn.execute("COMMIT;").unwrap();
2979        conn.execute("BEGIN CONCURRENT;").unwrap();
2980        conn.execute("UPDATE t SET v = 2 WHERE id = 1;").unwrap();
2981        conn.execute("COMMIT;").unwrap();
2982
2983        // Per-commit GC has already done most of the work; the
2984        // explicit vacuum is idempotent.
2985        let _ = conn.vacuum_mvcc();
2986        let db = conn.database();
2987        let store_size = db.mv_store().total_versions();
2988        drop(db);
2989        assert!(store_size <= 1);
2990    }
2991
2992    /// `is_retryable()` covers both `Busy` and `BusySnapshot`
2993    /// without callers having to match each variant. The contract
2994    /// SDK retry helpers will rely on.
2995    #[test]
2996    fn is_retryable_covers_busy_variants() {
2997        assert!(SQLRiteError::Busy("x".into()).is_retryable());
2998        assert!(SQLRiteError::BusySnapshot("x".into()).is_retryable());
2999        assert!(!SQLRiteError::General("x".into()).is_retryable());
3000    }
3001
3002    /// Phase 11.9 — every BEGIN CONCURRENT commit on a file-backed
3003    /// database leaves an MVCC log-record frame in the WAL. The Pager
3004    /// surfaces those on reopen via `recovered_mvcc_commits`.
3005    #[test]
3006    fn mvcc_commit_persists_a_log_record_into_wal() {
3007        let path = tmp_path("mvcc_log_record");
3008        {
3009            let mut c = Connection::open(&path).unwrap();
3010            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3011            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3012                .unwrap();
3013            c.execute("BEGIN CONCURRENT;").unwrap();
3014            c.execute("INSERT INTO t (id, v) VALUES (1, 42);").unwrap();
3015            c.execute("COMMIT;").unwrap();
3016        }
3017        // Reopen and confirm the WAL replay surfaced the batch.
3018        let c2 = Connection::open(&path).unwrap();
3019        let db = c2.database();
3020        let pager = db.pager.as_ref().expect("file-backed db carries a pager");
3021        let batches = pager.recovered_mvcc_commits();
3022        assert_eq!(batches.len(), 1, "one BEGIN CONCURRENT commit -> one batch");
3023        assert_eq!(batches[0].records.len(), 1, "one row written");
3024        let rec = &batches[0].records[0];
3025        assert_eq!(rec.row.table, "t");
3026        assert_eq!(rec.row.rowid, 1);
3027        match &rec.payload {
3028            VersionPayload::Present(cols) => {
3029                assert!(cols.iter().any(
3030                    |(k, v)| k == "v" && matches!(v, crate::sql::db::table::Value::Integer(42))
3031                ));
3032            }
3033            other => panic!("unexpected payload: {other:?}"),
3034        }
3035        drop(db);
3036        drop(c2);
3037        cleanup(&path);
3038    }
3039
3040    /// Phase 11.9 — on reopen the MVCC log records are pushed back
3041    /// into `MvStore`. The conflict-detection window survives a
3042    /// process restart: a write whose `begin_ts` predates a
3043    /// replayed commit must surface as `Busy`.
3044    #[test]
3045    fn mvcc_reopen_restores_mv_store_and_clock() {
3046        let path = tmp_path("mvcc_reopen");
3047        {
3048            let mut c = Connection::open(&path).unwrap();
3049            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3050            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3051                .unwrap();
3052            c.execute("BEGIN CONCURRENT;").unwrap();
3053            c.execute("INSERT INTO t (id, v) VALUES (1, 10);").unwrap();
3054            c.execute("COMMIT;").unwrap();
3055            c.execute("BEGIN CONCURRENT;").unwrap();
3056            c.execute("UPDATE t SET v = 20 WHERE id = 1;").unwrap();
3057            c.execute("COMMIT;").unwrap();
3058        }
3059        let c2 = Connection::open(&path).unwrap();
3060        let db = c2.database();
3061        // Two commits replayed → two versions for row t/1 (the
3062        // first capped, the second open-ended).
3063        let store = db.mv_store();
3064        let row = RowID::new("t", 1);
3065        assert!(
3066            store.latest_committed_begin(&row).is_some(),
3067            "MvStore should know about row t/1 after reopen"
3068        );
3069        // Clock must have advanced past the persisted commits so
3070        // any new transaction gets a fresh `begin_ts`.
3071        let last_commit_ts = store.latest_committed_begin(&row).unwrap();
3072        assert!(
3073            db.mvcc_clock().now() >= last_commit_ts,
3074            "clock {} must be >= last replayed commit_ts {}",
3075            db.mvcc_clock().now(),
3076            last_commit_ts,
3077        );
3078        drop(db);
3079        drop(c2);
3080        cleanup(&path);
3081    }
3082
3083    /// Phase 11.9 — multi-row batches survive replay intact, with
3084    /// every (RowID, payload) pair coming back from the WAL.
3085    #[test]
3086    fn mvcc_multi_row_batch_replays_intact() {
3087        let path = tmp_path("mvcc_multi_row");
3088        {
3089            let mut c = Connection::open(&path).unwrap();
3090            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3091            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3092                .unwrap();
3093            // Seed rows under legacy mode so the concurrent tx
3094            // can UPDATE them — Phase 11 keeps INSERT-only
3095            // semantics for the concurrent path simple.
3096            c.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
3097            c.execute("INSERT INTO t (id, v) VALUES (2, 2);").unwrap();
3098            c.execute("INSERT INTO t (id, v) VALUES (3, 3);").unwrap();
3099
3100            c.execute("BEGIN CONCURRENT;").unwrap();
3101            c.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
3102            c.execute("UPDATE t SET v = 200 WHERE id = 2;").unwrap();
3103            c.execute("UPDATE t SET v = 300 WHERE id = 3;").unwrap();
3104            c.execute("COMMIT;").unwrap();
3105        }
3106        let c2 = Connection::open(&path).unwrap();
3107        let db = c2.database();
3108        let pager = db.pager.as_ref().unwrap();
3109        let batches = pager.recovered_mvcc_commits();
3110        assert_eq!(batches.len(), 1, "single COMMIT -> single batch");
3111        let rowids: Vec<i64> = batches[0].records.iter().map(|r| r.row.rowid).collect();
3112        assert!(rowids.contains(&1));
3113        assert!(rowids.contains(&2));
3114        assert!(rowids.contains(&3));
3115        assert_eq!(batches[0].records.len(), 3);
3116        drop(db);
3117        drop(c2);
3118        cleanup(&path);
3119    }
3120
3121    /// Phase 11.9 — a BEGIN CONCURRENT that's never committed
3122    /// leaves no MVCC frame in the WAL. The reopen path replays
3123    /// only what was sealed.
3124    #[test]
3125    fn mvcc_rolled_back_tx_leaves_no_wal_record() {
3126        let path = tmp_path("mvcc_rollback");
3127        {
3128            let mut c = Connection::open(&path).unwrap();
3129            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3130            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3131                .unwrap();
3132            c.execute("BEGIN CONCURRENT;").unwrap();
3133            c.execute("INSERT INTO t (id, v) VALUES (1, 999);").unwrap();
3134            c.execute("ROLLBACK;").unwrap();
3135        }
3136        let c2 = Connection::open(&path).unwrap();
3137        let db = c2.database();
3138        let pager = db.pager.as_ref().unwrap();
3139        assert!(
3140            pager.recovered_mvcc_commits().is_empty(),
3141            "ROLLBACK must not append MVCC frames"
3142        );
3143        // Legacy tables also untouched.
3144        let store = db.mv_store();
3145        assert_eq!(store.total_versions(), 0);
3146        drop(db);
3147        drop(c2);
3148        cleanup(&path);
3149    }
3150
3151    /// Phase 11.9 — legacy (non-BEGIN-CONCURRENT) commits do
3152    /// **not** emit MVCC frames. The persistence is opt-in along
3153    /// the same axis as `BEGIN CONCURRENT`.
3154    #[test]
3155    fn legacy_commit_does_not_emit_mvcc_frame() {
3156        let path = tmp_path("mvcc_legacy_no_frame");
3157        {
3158            let mut c = Connection::open(&path).unwrap();
3159            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3160            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
3161                .unwrap();
3162            c.execute("INSERT INTO t (id) VALUES (1);").unwrap();
3163        }
3164        let c2 = Connection::open(&path).unwrap();
3165        let db = c2.database();
3166        let pager = db.pager.as_ref().unwrap();
3167        assert!(
3168            pager.recovered_mvcc_commits().is_empty(),
3169            "legacy writes never produce MVCC frames"
3170        );
3171        drop(db);
3172        drop(c2);
3173        cleanup(&path);
3174    }
3175
3176    /// Phase 11.9 — crash recovery sketch. After several
3177    /// concurrent commits we drop the connection without an
3178    /// explicit checkpoint (the auto-checkpoint threshold is
3179    /// well above what 3 frames triggers). A fresh open replays
3180    /// every MVCC frame and reconstructs the chain.
3181    #[test]
3182    fn mvcc_replays_multiple_commits_after_unclean_close() {
3183        let path = tmp_path("mvcc_unclean_close");
3184        {
3185            let mut c = Connection::open(&path).unwrap();
3186            c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3187            c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3188                .unwrap();
3189            for v in 0..5 {
3190                c.execute("BEGIN CONCURRENT;").unwrap();
3191                if v == 0 {
3192                    c.execute("INSERT INTO t (id, v) VALUES (1, 0);").unwrap();
3193                } else {
3194                    c.execute(&format!("UPDATE t SET v = {v} WHERE id = 1;"))
3195                        .unwrap();
3196                }
3197                c.execute("COMMIT;").unwrap();
3198            }
3199            // c drops here without calling checkpoint — the WAL
3200            // still holds every MVCC frame.
3201        }
3202        let c2 = Connection::open(&path).unwrap();
3203        let db = c2.database();
3204        let pager = db.pager.as_ref().unwrap();
3205        let batches = pager.recovered_mvcc_commits();
3206        assert_eq!(batches.len(), 5, "every COMMIT must show up after reopen");
3207        // commit_ts values are strictly increasing.
3208        for w in batches.windows(2) {
3209            assert!(w[0].commit_ts < w[1].commit_ts);
3210        }
3211        drop(db);
3212        drop(c2);
3213        cleanup(&path);
3214    }
3215
3216    #[test]
3217    fn prepare_cached_executes_the_same_as_prepare() {
3218        let mut conn = Connection::open_in_memory().unwrap();
3219        conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
3220            .unwrap();
3221        let mut ins = conn
3222            .prepare_cached("INSERT INTO t (a, b) VALUES (?, ?)")
3223            .unwrap();
3224        ins.execute_with_params(&[Value::Integer(1), Value::Text("alpha".into())])
3225            .unwrap();
3226        ins.execute_with_params(&[Value::Integer(2), Value::Text("beta".into())])
3227            .unwrap();
3228
3229        let stmt = conn.prepare_cached("SELECT b FROM t WHERE a = ?").unwrap();
3230        let rows = stmt
3231            .query_with_params(&[Value::Integer(2)])
3232            .unwrap()
3233            .collect_all()
3234            .unwrap();
3235        assert_eq!(rows.len(), 1);
3236        assert_eq!(rows[0].get::<String>(0).unwrap(), "beta");
3237    }
3238}