sqlrite/connection.rs
1//! Public `Connection` / `Statement` / `Rows` / `Row` API (Phase 5a + SQLR-23).
2//!
3//! This is the stable surface external consumers bind against — Rust
4//! callers use it directly, language SDKs (Python, Node.js, Go) bind
5//! against the C FFI wrapper over these same types in Phase 5b, and
6//! the WASM build in Phase 5g re-exposes them via `wasm-bindgen`.
7//!
8//! The shape mirrors `rusqlite` / Python's `sqlite3` so users
9//! familiar with either can pick it up immediately:
10//!
11//! ```no_run
12//! use sqlrite::Connection;
13//!
14//! let mut conn = Connection::open("foo.sqlrite")?;
15//! conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")?;
16//! conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
17//!
18//! let mut stmt = conn.prepare("SELECT id, name FROM users")?;
19//! let mut rows = stmt.query()?;
20//! while let Some(row) = rows.next()? {
21//! let id: i64 = row.get(0)?;
22//! let name: String = row.get(1)?;
23//! println!("{id}: {name}");
24//! }
25//! # Ok::<(), sqlrite::SQLRiteError>(())
26//! ```
27//!
28//! **Relationship to the internal engine.** A `Connection` owns a
29//! `Database` (which owns a `Pager` for file-backed connections).
30//! `execute` and `query` go through the same `process_command`
31//! pipeline the REPL uses, just with typed row return instead of
32//! pre-rendered tables. The internal `Database` / `Pager` stay
33//! accessible via `sqlrite::sql::...` for the engine's own tests
34//! and for the desktop app — but those paths aren't considered
35//! stable API.
36//!
37//! # Prepared statements & parameter binding (SQLR-23)
38//!
39//! `Connection::prepare` parses the SQL once and stashes the AST on
40//! the returned `Statement`. Subsequent calls to `Statement::query` /
41//! `Statement::run` execute against the cached AST without re-running
42//! sqlparser. Bound versions ([`Statement::query_with_params`] /
43//! [`Statement::execute_with_params`]) accept a `&[Value]` slice that is
44//! substituted into the cached AST at execute time — including
45//! `Value::Vector(...)` for HNSW-eligible KNN queries, where binding
46//! the query vector skips per-iter lexing of the 4 KB bracket-array
47//! literal.
48//!
49//! [`Connection::prepare_cached`] adds a small per-connection LRU
50//! (default cap 16) so a hot SQL string is parsed exactly once across
51//! every call, not once per `prepare()`. Matches the rusqlite pattern.
52
53use std::collections::{HashMap, VecDeque};
54use std::path::Path;
55use std::sync::{Arc, Mutex, MutexGuard};
56
57use crate::sql::dialect::SqlriteDialect;
58use sqlparser::ast::Statement as AstStatement;
59use sqlparser::parser::Parser;
60
61use crate::error::{Result, SQLRiteError};
62use crate::mvcc::{
63 ConcurrentTx, JournalMode, MvccCommitBatch, MvccLogRecord, RowID, RowVersion, VersionPayload,
64};
65use crate::sql::db::database::{Database, TxnSnapshot};
66use crate::sql::db::table::{Table, Value};
67use crate::sql::executor::execute_select_rows;
68use crate::sql::pager::{self, AccessMode, open_database_with_mode, save_database};
69use crate::sql::params::{rewrite_placeholders, substitute_params};
70use crate::sql::parser::select::SelectQuery;
71use crate::sql::process_ast_with_render;
72
73/// Default capacity of the per-connection prepared-statement plan cache.
74/// Matches rusqlite's default; tweak with [`Connection::set_prepared_cache_capacity`].
75const DEFAULT_PREP_CACHE_CAP: usize = 16;
76
77/// A handle to a SQLRite database. Opens a file or an in-memory DB;
78/// drop it to close. Every mutating statement auto-saves (except inside
79/// an explicit `BEGIN`/`COMMIT` block — see [Transactions](#transactions)).
80///
81/// ## Transactions
82///
83/// ```no_run
84/// # use sqlrite::Connection;
85/// let mut conn = Connection::open("foo.sqlrite")?;
86/// conn.execute("BEGIN")?;
87/// conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
88/// conn.execute("INSERT INTO users (name) VALUES ('bob')")?;
89/// conn.execute("COMMIT")?;
90/// # Ok::<(), sqlrite::SQLRiteError>(())
91/// ```
92///
93/// ## Multiple connections (Phase 10.1)
94///
95/// `Connection` is a thin handle over an `Arc<Mutex<Database>>`. Call
96/// [`Connection::connect`] to mint a sibling handle that shares the
97/// same backing `Database` — typically one per worker thread. Today
98/// every operation still serializes through the single mutex (and the
99/// pager's exclusive flock between processes), so the headline
100/// behaviour change is that callers can hold and address the same DB
101/// from more than one thread without wrapping the whole `Connection`
102/// in a `Mutex` themselves. `BEGIN CONCURRENT` and snapshot-isolated
103/// reads land in subsequent Phase 10 sub-phases.
104///
105/// `Connection` is `Send + Sync`. The recommended pattern is one
106/// connection per thread (clone via `connect()`); statements still
107/// borrow `&mut Connection`, so a single connection isn't suitable
108/// for true concurrent statement execution.
109pub struct Connection {
110 /// Shared engine state. Mints sibling connections via
111 /// [`Connection::connect`] without copying the in-memory tables
112 /// or the long-lived pager.
113 inner: Arc<Mutex<Database>>,
114 /// SQLR-23 — small SQL→cached-plan LRU. Keyed by the verbatim SQL
115 /// string the caller passed to `prepare_cached`. Stored as a
116 /// `VecDeque` rather than a HashMap+linked-list because the
117 /// expected capacity is small (default 16) — linear scan is fine
118 /// and the implementation stays dependency-free.
119 ///
120 /// Per-connection (not shared with sibling handles) — each thread
121 /// gets its own LRU so cache-mutation never crosses a thread
122 /// boundary.
123 prep_cache: VecDeque<(String, Arc<CachedPlan>)>,
124 prep_cache_cap: usize,
125 /// Phase 11.4 — per-connection `BEGIN CONCURRENT` state.
126 /// `None` outside a concurrent transaction; `Some` between
127 /// `BEGIN CONCURRENT` and `COMMIT` / `ROLLBACK`. Multiple
128 /// sibling connections can each hold their own — that's the
129 /// headline concurrency story this slice unlocks.
130 ///
131 /// While `Some`, every statement on this connection runs
132 /// against the cloned tables in [`ConcurrentTx::tables`]
133 /// instead of the live `Database::tables`. The live database
134 /// stays untouched until the commit-validation pass succeeds.
135 ///
136 /// **Phase 11.5 — wrapped in a `Mutex`.** [`Statement::query`]
137 /// and [`Statement::query_with_params`] take `&self`, so they
138 /// need interior mutability to swap the snapshot in for the
139 /// read. The lock is uncontended in single-thread use (each
140 /// connection's `concurrent_tx` is per-handle, and the
141 /// Statement-borrows-Connection contract still serializes
142 /// statements on a given handle); the Mutex is the cheapest
143 /// way to satisfy the borrow checker without restructuring
144 /// the Statement API. Lock order is always
145 /// `concurrent_tx` → `inner` to keep deadlock-free.
146 concurrent_tx: Mutex<Option<ConcurrentTx>>,
147}
148
149impl Connection {
150 /// Opens (or creates) a database file for read-write access.
151 ///
152 /// If the file doesn't exist, an empty one is materialized with the
153 /// current format version. Takes an exclusive advisory lock on the
154 /// file and its `-wal` sidecar; returns `Err` if either is already
155 /// locked by another process.
156 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
157 let path = path.as_ref();
158 let db_name = path
159 .file_stem()
160 .and_then(|s| s.to_str())
161 .unwrap_or("db")
162 .to_string();
163 let db = if path.exists() {
164 open_database_with_mode(path, db_name, AccessMode::ReadWrite)?
165 } else {
166 // Fresh file: materialize on disk and keep the attached
167 // pager. Setting `source_path` before `save_database` lets
168 // its `same_path` branch create the pager and stash it
169 // back on the Database — no reopen needed (and trying to
170 // reopen here would hit the file's own lock).
171 let mut fresh = Database::new(db_name);
172 fresh.source_path = Some(path.to_path_buf());
173 save_database(&mut fresh, path)?;
174 fresh
175 };
176 Ok(Self::wrap(db))
177 }
178
179 /// Opens an existing database file for read-only access. Takes a
180 /// shared advisory lock, so multiple read-only connections can
181 /// coexist on the same file; any open writer excludes them.
182 /// Mutating statements return `cannot execute: database is opened
183 /// read-only`.
184 pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
185 let path = path.as_ref();
186 let db_name = path
187 .file_stem()
188 .and_then(|s| s.to_str())
189 .unwrap_or("db")
190 .to_string();
191 let db = open_database_with_mode(path, db_name, AccessMode::ReadOnly)?;
192 Ok(Self::wrap(db))
193 }
194
195 /// Opens a transient in-memory database. No file is touched and no
196 /// locks are taken; state lives for the lifetime of the
197 /// `Connection` and is discarded on drop.
198 pub fn open_in_memory() -> Result<Self> {
199 Ok(Self::wrap(Database::new("memdb".to_string())))
200 }
201
202 fn wrap(db: Database) -> Self {
203 Self {
204 inner: Arc::new(Mutex::new(db)),
205 prep_cache: VecDeque::new(),
206 prep_cache_cap: DEFAULT_PREP_CACHE_CAP,
207 concurrent_tx: Mutex::new(None),
208 }
209 }
210
211 /// Phase 10.1 — mints another `Connection` sharing the same
212 /// backing `Database`. Hand the returned handle to a separate
213 /// thread to address the same in-memory tables and persistent
214 /// pager from there.
215 ///
216 /// The new handle starts with an empty prepared-statement cache
217 /// (caches are per-handle, by design). Inherits the parent's
218 /// `prepare_cached` capacity. Concurrent operations still
219 /// serialize through the engine's internal lock and the pager's
220 /// existing single-writer rule — a true multi-writer story
221 /// arrives with `BEGIN CONCURRENT` in Phase 10.4.
222 ///
223 /// ```no_run
224 /// # use sqlrite::Connection;
225 /// let mut primary = Connection::open("foo.sqlrite")?;
226 /// let secondary = primary.connect();
227 /// std::thread::spawn(move || {
228 /// let mut conn = secondary;
229 /// conn.execute("INSERT INTO t (x) VALUES (1)").unwrap();
230 /// })
231 /// .join()
232 /// .unwrap();
233 /// # Ok::<(), sqlrite::SQLRiteError>(())
234 /// ```
235 pub fn connect(&self) -> Self {
236 Self {
237 inner: Arc::clone(&self.inner),
238 prep_cache: VecDeque::new(),
239 prep_cache_cap: self.prep_cache_cap,
240 // Phase 11.4: each sibling handle starts outside any
241 // concurrent transaction. Multi-thread `BEGIN CONCURRENT`
242 // is the headline use case — every clone gets its own
243 // independent slot.
244 concurrent_tx: Mutex::new(None),
245 }
246 }
247
248 /// Phase 10.1 — number of `Connection` handles currently sharing
249 /// this database (this handle plus every live `connect()`
250 /// descendant). Useful for diagnostics and tests; no semantic
251 /// guarantee beyond that.
252 pub fn handle_count(&self) -> usize {
253 Arc::strong_count(&self.inner)
254 }
255
256 /// Locks the shared `Database` and returns the guard. Internal
257 /// helper — every public method that needs `&mut Database` calls
258 /// this. The lock is released when the guard drops, so callers
259 /// must keep the guard alive for the duration of the engine call
260 /// (typically by binding it to a local).
261 fn lock(&self) -> MutexGuard<'_, Database> {
262 // `unwrap` propagates a panic from another thread that held
263 // the lock — there's no engine-level recovery story for a
264 // poisoned `Database` (the in-memory tables would be in an
265 // unknown state), so failing fast is the right behaviour.
266 self.inner
267 .lock()
268 .unwrap_or_else(|e| panic!("sqlrite: database mutex poisoned: {e}"))
269 }
270
271 /// Parses and executes one SQL statement. For DDL (`CREATE TABLE`,
272 /// `CREATE INDEX`), DML (`INSERT`, `UPDATE`, `DELETE`) and
273 /// transaction control (`BEGIN`, `COMMIT`, `ROLLBACK`,
274 /// `BEGIN CONCURRENT`). Returns the status message the engine
275 /// produced (e.g. `"INSERT Statement executed."`).
276 ///
277 /// For `SELECT`, `execute` works but discards the row data and
278 /// just returns the rendered status — use [`Connection::prepare`]
279 /// and [`Statement::query`] to iterate typed rows.
280 ///
281 /// Phase 11.4 — intercepts `BEGIN CONCURRENT`, `COMMIT`, and
282 /// `ROLLBACK` before sqlparser sees them so the per-connection
283 /// MVCC transaction state stays in sync. Inside an open
284 /// concurrent transaction, every other statement runs against
285 /// the transaction's private cloned tables; the live database
286 /// stays untouched until commit-validation succeeds.
287 pub fn execute(&mut self, sql: &str) -> Result<String> {
288 let intent = concurrent_tx_intent(sql);
289 let has_tx = self.concurrent_tx_is_open();
290 match intent {
291 ConcurrentTxIntent::Begin => self.begin_concurrent(),
292 ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent(),
293 ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent(),
294 ConcurrentTxIntent::None
295 | ConcurrentTxIntent::Commit
296 | ConcurrentTxIntent::Rollback => self.execute_dispatch(sql),
297 }
298 }
299
300 /// Phase 11.11a — same as [`Connection::execute`], but returns
301 /// the full [`CommandOutput`] (status + optional pre-rendered
302 /// prettytable for `SELECT`). The REPL needs this to print the
303 /// table the engine produced *and* the status line in one
304 /// pass, while still routing `BEGIN CONCURRENT` / `COMMIT` /
305 /// `ROLLBACK` through the per-connection MVCC state.
306 ///
307 /// `BEGIN` / `COMMIT` / `ROLLBACK` carry no rendered output —
308 /// they return `CommandOutput { status, rendered: None }`.
309 pub fn execute_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
310 let intent = concurrent_tx_intent(sql);
311 let has_tx = self.concurrent_tx_is_open();
312 let status = match intent {
313 ConcurrentTxIntent::Begin => self.begin_concurrent()?,
314 ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent()?,
315 ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent()?,
316 ConcurrentTxIntent::None
317 | ConcurrentTxIntent::Commit
318 | ConcurrentTxIntent::Rollback => return self.execute_dispatch_with_render(sql),
319 };
320 Ok(crate::sql::CommandOutput {
321 status,
322 rendered: None,
323 })
324 }
325
326 /// Phase 11.5 — cheap probe used by [`Connection::execute`]
327 /// (and [`Statement::query`]) to decide whether to route
328 /// through the concurrent-tx dispatch. Acquires the
329 /// `concurrent_tx` mutex briefly; never blocks for a
330 /// meaningful amount of time because the only other lockers
331 /// are this connection's own writers.
332 ///
333 /// Public so the REPL can render per-handle tx state in
334 /// `.conns` output (Phase 11.11a).
335 pub fn concurrent_tx_is_open(&self) -> bool {
336 self.lock_concurrent_tx().is_some()
337 }
338
339 /// Phase 11.5 — locks the per-connection
340 /// `Mutex<Option<ConcurrentTx>>`. Wrapping the poison handler
341 /// in one place keeps every caller's lock-order discipline
342 /// visible at the call site (always `concurrent_tx` before
343 /// `inner`).
344 fn lock_concurrent_tx(&self) -> MutexGuard<'_, Option<ConcurrentTx>> {
345 self.concurrent_tx.lock().unwrap_or_else(|e| {
346 panic!("sqlrite: concurrent_tx mutex poisoned: {e}");
347 })
348 }
349
350 /// Phase 11.5 — runs `f` against the read-side `&Database`
351 /// the caller's transaction expects to see.
352 ///
353 /// - **No concurrent transaction open** — `f` runs against the
354 /// live `Database::tables`. Same path the legacy `query`
355 /// used.
356 /// - **Concurrent transaction open** — swaps the transaction's
357 /// private cloned `tables` in for the duration of `f`, so
358 /// `f` sees the BEGIN-time snapshot plus any writes the
359 /// transaction has staged. Swaps back before the function
360 /// returns even on error (the swap-back uses a scope guard
361 /// pattern so a panic inside `f` doesn't leave `db.tables`
362 /// pointing at the snapshot clone).
363 ///
364 /// Takes `&self` (rather than `&mut self`) because the
365 /// `Statement::query` API contract is `&self` — that's why the
366 /// `concurrent_tx` field lives behind a `Mutex`. Lock order is
367 /// `concurrent_tx` → `inner`, matching every other tx-aware
368 /// path on this connection.
369 pub(crate) fn with_snapshot_read<F, R>(&self, f: F) -> R
370 where
371 F: FnOnce(&Database) -> R,
372 {
373 let mut tx_slot = self.lock_concurrent_tx();
374 let mut db = self.lock();
375 match tx_slot.as_mut() {
376 None => f(&db),
377 Some(tx) => {
378 // Swap the snapshot in. Use a scope guard so the
379 // unswap happens even if `f` unwinds — leaving
380 // `db.tables` pointing at the tx's private clone
381 // would be catastrophic for later sibling-handle
382 // reads.
383 std::mem::swap(&mut db.tables, &mut tx.tables);
384 let prior_txn = db.txn.take();
385 db.txn = Some(TxnSnapshot {
386 tables: HashMap::new(),
387 });
388
389 struct UnswapGuard<'a> {
390 db: &'a mut Database,
391 tx_tables: &'a mut HashMap<String, Table>,
392 prior_txn: Option<TxnSnapshot>,
393 armed: bool,
394 }
395 impl Drop for UnswapGuard<'_> {
396 fn drop(&mut self) {
397 if self.armed {
398 self.db.txn = self.prior_txn.take();
399 std::mem::swap(&mut self.db.tables, self.tx_tables);
400 }
401 }
402 }
403 let mut guard = UnswapGuard {
404 db: &mut db,
405 tx_tables: &mut tx.tables,
406 prior_txn,
407 armed: true,
408 };
409
410 let result = f(guard.db);
411
412 // Disarm the guard explicitly and unwind in the
413 // expected order so the borrow checker can see
414 // both fields are accessed disjointly.
415 guard.armed = false;
416 guard.db.txn = guard.prior_txn.take();
417 std::mem::swap(&mut guard.db.tables, guard.tx_tables);
418
419 result
420 }
421 }
422 }
423
424 /// Internal — runs `sql` against the engine. If a concurrent
425 /// transaction is open, swaps the transaction's private
426 /// `tables` map in for the duration of the dispatch so writes
427 /// land on the snapshot, not the live database. Otherwise
428 /// falls straight through to the legacy
429 /// [`crate::sql::process_command`] path.
430 fn execute_dispatch(&mut self, sql: &str) -> Result<String> {
431 if self.concurrent_tx_is_open() {
432 self.execute_in_concurrent_tx(sql)
433 } else {
434 let mut db = self.lock();
435 crate::sql::process_command(sql, &mut db)
436 }
437 }
438
439 /// Phase 11.11a — render-aware twin of
440 /// [`Connection::execute_dispatch`]. Same branching, but the
441 /// non-concurrent path calls `process_command_with_render` and
442 /// the concurrent path goes through
443 /// [`Connection::execute_in_concurrent_tx_with_render`].
444 fn execute_dispatch_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
445 if self.concurrent_tx_is_open() {
446 self.execute_in_concurrent_tx_with_render(sql)
447 } else {
448 let mut db = self.lock();
449 crate::sql::process_command_with_render(sql, &mut db)
450 }
451 }
452
453 /// Phase 11.4 — opens a `BEGIN CONCURRENT` transaction on this
454 /// connection. Allocates a new `TxHandle` (which advances the
455 /// MVCC clock by one), deep-clones the live tables into the
456 /// per-connection [`ConcurrentTx`] state, and records the
457 /// schema fingerprint. Returns the status string the REPL
458 /// renders (`"BEGIN"`).
459 ///
460 /// Errors if the database isn't in `journal_mode = mvcc`, or
461 /// if any transaction (concurrent or legacy `BEGIN`) is
462 /// already open on this connection.
463 fn begin_concurrent(&mut self) -> Result<String> {
464 // Lock order: concurrent_tx → inner (db). Keep this order
465 // in every method that touches both — deadlock-free by
466 // construction.
467 let mut tx_slot = self.lock_concurrent_tx();
468 if tx_slot.is_some() {
469 return Err(SQLRiteError::General(
470 "cannot BEGIN CONCURRENT: a concurrent transaction is already open".to_string(),
471 ));
472 }
473 let db = self.lock();
474 if db.journal_mode() != JournalMode::Mvcc {
475 return Err(SQLRiteError::General(
476 "BEGIN CONCURRENT requires `PRAGMA journal_mode = mvcc;` first".to_string(),
477 ));
478 }
479 if db.in_transaction() {
480 return Err(SQLRiteError::General(
481 "cannot BEGIN CONCURRENT: a non-concurrent transaction is already open".to_string(),
482 ));
483 }
484 if db.is_read_only() {
485 return Err(SQLRiteError::General(
486 "cannot BEGIN CONCURRENT: database is opened read-only".to_string(),
487 ));
488 }
489 let tx = ConcurrentTx::begin(db.mvcc_clock(), db.mv_store().active_registry(), &db.tables);
490 drop(db);
491 *tx_slot = Some(tx);
492 Ok("BEGIN".to_string())
493 }
494
495 /// Phase 11.4 — commits the open concurrent transaction.
496 ///
497 /// Steps (Hekaton-style optimistic validation):
498 ///
499 /// 1. Diff the transaction's private `tables` against the
500 /// live `Database::tables` to derive the write-set.
501 /// 2. For each row in the write-set, walk the
502 /// [`MvStore`](crate::mvcc::MvStore) chain. If any
503 /// committed version's `begin > tx.begin_ts`, abort with
504 /// [`SQLRiteError::Busy`] — some other transaction
505 /// superseded the row after our snapshot.
506 /// 3. Allocate a `commit_ts`, push every write into the
507 /// `MvStore` as a committed version (caps the previous
508 /// latest's `end` at `commit_ts`), and apply the writes
509 /// to `Database::tables`.
510 /// 4. Run the legacy `save_database` so the changes durable
511 /// via the existing WAL.
512 ///
513 /// On `Busy`, the transaction is dropped (rollback semantics)
514 /// and the caller should retry with a fresh `BEGIN
515 /// CONCURRENT`.
516 fn commit_concurrent(&mut self) -> Result<String> {
517 let mut tx_slot = self.lock_concurrent_tx();
518 let tx = tx_slot
519 .take()
520 .expect("commit_concurrent called without active tx (caller should check)");
521 // Drop the slot guard — we already moved the tx out, and
522 // holding it across `self.lock()` would violate the
523 // `concurrent_tx → inner` order if any helper were to
524 // grow a reverse acquire.
525 drop(tx_slot);
526
527 let mut db = self.lock();
528
529 // Schema drift catches DDL run on the live database under
530 // us. v0 rejects DDL inside the tx; outside is the only
531 // way to land here.
532 if !tx.schema_unchanged(&db.tables) {
533 return Err(SQLRiteError::Busy(
534 "schema changed under BEGIN CONCURRENT (a CREATE/DROP/ALTER ran on \
535 another connection); transaction rolled back"
536 .to_string(),
537 ));
538 }
539
540 // Diff against the BEGIN-time clone, NOT against the live
541 // database. Other concurrent transactions may have
542 // committed between our BEGIN and now; their writes show
543 // up in `db.tables` but aren't part of our write-set, and
544 // diffing against live would surface them as bogus DELETEs
545 // (silently undoing someone else's commit).
546 let writes = diff_tables_for_writes(&tx.tables_at_begin, &tx.tables)?;
547
548 // Validation pass: walk the write-set against MvStore.
549 let mv = db.mv_store().clone();
550 let begin_ts = tx.begin_ts();
551 for (row_id, _payload) in &writes {
552 if let Some(latest_begin) = mv.latest_committed_begin(row_id) {
553 if latest_begin > begin_ts {
554 return Err(SQLRiteError::Busy(format!(
555 "write-write conflict on {}/{}: another transaction committed \
556 this row at ts={latest_begin} (after our begin_ts={begin_ts}); \
557 transaction rolled back, retry with a fresh BEGIN CONCURRENT",
558 row_id.table, row_id.rowid,
559 )));
560 }
561 }
562 }
563
564 // Validation passed — allocate commit_ts and apply.
565 let commit_ts = db.mvcc_clock().tick();
566 for (row_id, payload) in &writes {
567 let version = RowVersion::committed(commit_ts, payload.clone());
568 // `push_committed`'s monotonic-begin check is satisfied
569 // because validation above ensured no version has
570 // begin >= commit_ts (commit_ts is freshly ticked).
571 mv.push_committed(row_id.clone(), version)
572 .map_err(|e| SQLRiteError::General(format!("MvStore push failed: {e}")))?;
573 }
574
575 // Apply the diff to Database::tables. Reuses the legacy
576 // INSERT / UPDATE / DELETE shape so post-commit reads on
577 // any handle (concurrent or legacy) see the latest row
578 // values via the existing read path.
579 apply_writes_to_live(&mut db, &tx.tables, &writes)?;
580
581 // Phase 11.9 — append the MVCC commit batch into the WAL
582 // before the legacy page-commit flush. The MVCC frame is
583 // not fsync'd on its own; the legacy `save_database`
584 // below ends with a commit-frame fsync that durably
585 // includes every byte written since the previous fsync,
586 // covering this batch too. A crash between the two
587 // append calls drops both — torn-write atomicity for the
588 // whole transaction.
589 //
590 // For in-memory databases (no source_path) we skip the
591 // WAL append: there's no pager and no fsync. MVCC state
592 // stays in the in-memory `MvStore` for the lifetime of
593 // the process.
594 if let Some(pager) = db.pager.as_mut() {
595 let records = writes
596 .iter()
597 .map(|(row, payload)| MvccLogRecord {
598 row: row.clone(),
599 payload: payload.clone(),
600 })
601 .collect();
602 let batch = MvccCommitBatch { commit_ts, records };
603 if let Err(append_err) = pager.append_mvcc_batch(&batch) {
604 return Err(SQLRiteError::General(format!(
605 "COMMIT failed appending MVCC log record: {append_err}"
606 )));
607 }
608 // Bump the WAL header's persisted clock high-water so
609 // the next checkpoint truncates with a header that
610 // covers this commit. The MVCC frames themselves
611 // also carry `commit_ts`, so even an un-checkpointed
612 // crash still seeds the clock correctly via the
613 // replayer's max-with-frames logic — this just keeps
614 // the post-checkpoint path correct.
615 if let Err(set_err) = pager.observe_clock_high_water(commit_ts) {
616 return Err(SQLRiteError::General(format!(
617 "COMMIT failed updating WAL clock high-water: {set_err}"
618 )));
619 }
620 }
621
622 // Persist via the legacy WAL — the on-disk format is
623 // unchanged in 11.4+. The page-commit's fsync below
624 // covers the MVCC frame appended above; one atomic
625 // boundary for the whole transaction.
626 if let Some(path) = db.source_path.clone() {
627 if let Err(save_err) = pager::save_database(&mut db, &path) {
628 return Err(SQLRiteError::General(format!(
629 "COMMIT failed during save_database: {save_err}"
630 )));
631 }
632 }
633
634 // Phase 11.6 — per-commit GC sweep on the write-set's
635 // chains. Drop the `tx` handle FIRST so its `begin_ts`
636 // exits the active-tx registry; otherwise the watermark
637 // is still pinned at our own `begin_ts` and we'd preserve
638 // versions we're free to reclaim. Only the rows this
639 // transaction wrote can have a newly-capped `end` worth
640 // sweeping — the broader GC story (full-store sweeps,
641 // background drains) lands behind explicit
642 // [`Connection::vacuum_mvcc`] / [`MvStore::gc_all`].
643 drop(tx);
644 let watermark = mv.active_watermark();
645 for (row_id, _) in &writes {
646 mv.gc_chain(row_id, watermark);
647 }
648 Ok("COMMIT".to_string())
649 }
650
651 /// Phase 11.4 — rolls back the open concurrent transaction.
652 /// Drops the per-connection state; the live `Database::tables`
653 /// is unchanged because writes never landed there.
654 fn rollback_concurrent(&mut self) -> Result<String> {
655 // tx drops here; TxHandle unregisters automatically.
656 let _ = self
657 .lock_concurrent_tx()
658 .take()
659 .expect("rollback_concurrent called without active tx (caller should check)");
660 Ok("ROLLBACK".to_string())
661 }
662
663 /// Phase 11.4 — runs `sql` against the open concurrent
664 /// transaction's private cloned tables. Implementation: swap
665 /// `db.tables` <-> `tx.tables` for the duration of the
666 /// dispatch, suppress auto-save by parking a dummy
667 /// [`TxnSnapshot`] on `db.txn`, then unwind both.
668 ///
669 /// DDL is rejected before the swap with a typed error —
670 /// schema mutations inside a `BEGIN CONCURRENT` block aren't
671 /// supported in v0 (the plan flags this as an explicit
672 /// non-goal, and the swap-based dispatch can't safely apply
673 /// new tables to the live database without a separate merge
674 /// pass).
675 fn execute_in_concurrent_tx(&mut self, sql: &str) -> Result<String> {
676 self.execute_in_concurrent_tx_with_render(sql)
677 .map(|o| o.status)
678 }
679
680 /// Render-aware twin of [`Connection::execute_in_concurrent_tx`].
681 /// Same swap-based dispatch; the only difference is the inner
682 /// call goes through `process_command_with_render` so the
683 /// caller gets the rendered SELECT table (Phase 11.11a).
684 fn execute_in_concurrent_tx_with_render(
685 &mut self,
686 sql: &str,
687 ) -> Result<crate::sql::CommandOutput> {
688 let intent = legacy_tx_intent(sql);
689 if matches!(intent, LegacyTxIntent::Begin) {
690 return Err(SQLRiteError::General(
691 "cannot BEGIN: a concurrent transaction is already open".to_string(),
692 ));
693 }
694 // String-prefix DDL check. Rejecting up front means the
695 // tx's snapshot never gets a half-applied schema change —
696 // which would be hard to merge back at commit because the
697 // live database wouldn't agree.
698 if rejects_in_concurrent_tx(sql) {
699 return Err(SQLRiteError::General(
700 "DDL is not supported inside BEGIN CONCURRENT (v0 limitation; the \
701 transaction stays open, the live schema is unchanged)"
702 .to_string(),
703 ));
704 }
705
706 // Lock order: concurrent_tx → inner (db). Same shape as
707 // every other tx-aware path on this connection.
708 let mut tx_slot = self.lock_concurrent_tx();
709 let tx = tx_slot
710 .as_mut()
711 .expect("execute_in_concurrent_tx called without active tx");
712 let mut db = self.inner.lock().unwrap_or_else(|e| {
713 panic!("sqlrite: database mutex poisoned: {e}");
714 });
715
716 // Swap the snapshot in. After this, db.tables IS the tx's
717 // private clone; the executor mutates it freely.
718 std::mem::swap(&mut db.tables, &mut tx.tables);
719
720 // Suppress auto-save with a dummy TxnSnapshot. The
721 // executor's auto-save check looks at `db.in_transaction()`,
722 // which is true while `db.txn` is `Some`. The dummy
723 // snapshot is never restored from — `tx` itself owns the
724 // rollback story for concurrent transactions.
725 let prior_txn = db.txn.take();
726 db.txn = Some(TxnSnapshot {
727 tables: HashMap::new(),
728 });
729
730 let result = crate::sql::process_command_with_render(sql, &mut db);
731
732 // Unwind in reverse: take the dummy txn off (don't restore
733 // anything from it), swap the tables back.
734 db.txn = prior_txn;
735 std::mem::swap(&mut db.tables, &mut tx.tables);
736 result
737 }
738
739 /// Prepares a statement for repeated execution or row iteration.
740 /// SQLR-23: the SQL is parsed once at prepare time (sqlparser walk
741 /// plus placeholder rewriting), and the resulting AST is cached
742 /// on the [`Statement`] for re-execution without further parsing.
743 ///
744 /// Use [`Statement::query`] / [`Statement::run`] for unbound
745 /// execution, or [`Statement::query_with_params`] /
746 /// [`Statement::execute_with_params`] to substitute `?`
747 /// placeholders.
748 pub fn prepare<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
749 let plan = Arc::new(CachedPlan::compile(sql)?);
750 Ok(Statement { conn: self, plan })
751 }
752
753 /// Same as [`Connection::prepare`], but consults a small
754 /// per-connection LRU first. SQLR-23 — for hot statements
755 /// (the body of an INSERT loop, a frequently-rerun lookup) the
756 /// sqlparser walk is amortized to once across the connection's
757 /// lifetime, not once per `prepare()`.
758 ///
759 /// Default cache capacity is 16; tune with
760 /// [`Connection::set_prepared_cache_capacity`].
761 pub fn prepare_cached<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
762 // Lookup-or-insert. Found entries are also moved to the back
763 // (most-recently-used) so capacity-eviction runs LRU.
764 let plan = if let Some(pos) = self.prep_cache.iter().position(|(k, _)| k == sql) {
765 let (k, v) = self.prep_cache.remove(pos).unwrap();
766 self.prep_cache.push_back((k, Arc::clone(&v)));
767 v
768 } else {
769 let plan = Arc::new(CachedPlan::compile(sql)?);
770 self.prep_cache
771 .push_back((sql.to_string(), Arc::clone(&plan)));
772 while self.prep_cache.len() > self.prep_cache_cap {
773 self.prep_cache.pop_front();
774 }
775 plan
776 };
777 Ok(Statement { conn: self, plan })
778 }
779
780 /// SQLR-23 — sets the maximum number of cached prepared plans
781 /// (matches `prepare_cached`'s default 16). Reducing below the
782 /// current size evicts the oldest entries; setting to 0 disables
783 /// caching but `prepare_cached` still works (it just always
784 /// re-parses).
785 pub fn set_prepared_cache_capacity(&mut self, cap: usize) {
786 self.prep_cache_cap = cap;
787 while self.prep_cache.len() > cap {
788 self.prep_cache.pop_front();
789 }
790 }
791
792 /// SQLR-23 — current number of plans held by the prepared-statement
793 /// cache. Useful for tests / introspection; not load-bearing for
794 /// the public API.
795 pub fn prepared_cache_len(&self) -> usize {
796 self.prep_cache.len()
797 }
798
799 /// Returns `true` while a `BEGIN … COMMIT/ROLLBACK` block is open
800 /// against this connection.
801 pub fn in_transaction(&self) -> bool {
802 self.lock().in_transaction()
803 }
804
805 /// Returns the current auto-VACUUM threshold (SQLR-10). After a
806 /// page-releasing DDL (DROP TABLE / DROP INDEX / ALTER TABLE DROP
807 /// COLUMN) commits, the engine compacts the file in place if the
808 /// freelist exceeds this fraction of `page_count`. New connections
809 /// default to `Some(0.25)` (SQLite parity); `None` means the
810 /// trigger is disabled. See [`Connection::set_auto_vacuum_threshold`].
811 pub fn auto_vacuum_threshold(&self) -> Option<f32> {
812 self.lock().auto_vacuum_threshold()
813 }
814
815 /// Sets the auto-VACUUM threshold (SQLR-10). `Some(t)` with `t` in
816 /// `0.0..=1.0` arms the trigger; `None` disables it. Values outside
817 /// `0.0..=1.0` (or NaN / infinite) return a typed error rather than
818 /// silently saturating. The setting is per-database runtime state —
819 /// closing the last connection to a database drops it; new
820 /// connections start at the default `Some(0.25)`.
821 ///
822 /// Calling this on an in-memory or read-only database is allowed
823 /// (it just won't fire — there's nothing to compact / no writes
824 /// will reach the trigger).
825 pub fn set_auto_vacuum_threshold(&mut self, threshold: Option<f32>) -> Result<()> {
826 self.lock().set_auto_vacuum_threshold(threshold)
827 }
828
829 /// Returns `true` if the connection was opened read-only. Mutating
830 /// statements on a read-only connection return a typed error.
831 pub fn is_read_only(&self) -> bool {
832 self.lock().is_read_only()
833 }
834
835 /// Phase 11.3 — current journal mode. `Wal` (default) keeps every
836 /// pre-Phase-11 caller's behaviour. `Mvcc` is opt-in via
837 /// `PRAGMA journal_mode = mvcc;`. Per-database — every
838 /// [`Connection::connect`] sibling sees the same value.
839 ///
840 /// Today this is observable but doesn't change query behaviour;
841 /// 11.4 wires `Mvcc` mode into the read/write paths.
842 pub fn journal_mode(&self) -> crate::mvcc::JournalMode {
843 self.lock().journal_mode()
844 }
845
846 /// Phase 11.6 — explicit full-store MVCC garbage collection
847 /// pass. Walks every row in the [`MvStore`](crate::mvcc::MvStore)
848 /// chain and drops versions whose `end` timestamp is below the
849 /// current watermark (the smallest `begin_ts` across all
850 /// in-flight transactions on this database, or `u64::MAX` when
851 /// nothing is in flight).
852 ///
853 /// Returns the number of versions reclaimed. Cheap when the
854 /// store is small; a future optimisation will give it
855 /// background-thread semantics behind a configurable cadence.
856 ///
857 /// Per-commit GC already sweeps the rows each transaction
858 /// touched, so most callers don't need this — it's the
859 /// "vacuum the whole store" escape hatch for memory-pressure
860 /// workloads or test suites that want a deterministic baseline.
861 /// Safe to call even if `journal_mode` is `Wal` (the store is
862 /// just empty); useful for tests that want to assert "no
863 /// versions left."
864 pub fn vacuum_mvcc(&self) -> usize {
865 let db = self.lock();
866 let mv = db.mv_store().clone();
867 let watermark = mv.active_watermark();
868 drop(db);
869 mv.gc_all(watermark)
870 }
871
872 /// Escape hatch for advanced callers — locks the shared `Database`
873 /// and hands back the guard. Not part of the stable API; will move
874 /// or change as Phase 10's MVCC sub-phases land.
875 ///
876 /// Bind the guard to a local before calling functions that take
877 /// `&Database`:
878 ///
879 /// ```no_run
880 /// # use sqlrite::Connection;
881 /// # fn use_db(_d: &sqlrite::Database) {}
882 /// let conn = Connection::open_in_memory()?;
883 /// let db = conn.database();
884 /// use_db(&db);
885 /// # Ok::<(), sqlrite::SQLRiteError>(())
886 /// ```
887 #[doc(hidden)]
888 pub fn database(&self) -> MutexGuard<'_, Database> {
889 self.lock()
890 }
891
892 #[doc(hidden)]
893 pub fn database_mut(&mut self) -> MutexGuard<'_, Database> {
894 self.lock()
895 }
896}
897
898impl std::fmt::Debug for Connection {
899 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
900 let db = self.lock();
901 f.debug_struct("Connection")
902 .field("in_transaction", &db.in_transaction())
903 .field("read_only", &db.is_read_only())
904 .field("tables", &db.tables.len())
905 .field("prep_cache_len", &self.prep_cache.len())
906 .field("handles", &Arc::strong_count(&self.inner))
907 .field("concurrent_tx", &self.concurrent_tx_is_open())
908 .finish()
909 }
910}
911
912// =====================================================================
913// Phase 11.4 — concurrent-transaction helpers
914//
915// These live as free functions (rather than methods) so the borrow
916// checker stays out of the way: callers in `Connection::execute*`
917// already juggle mutable borrows of `self.concurrent_tx` and
918// `self.inner.lock()` simultaneously, and threading a third `&mut self`
919// through helpers would force every helper to either take owned
920// arguments or split the borrow at the call site. Free functions take
921// exactly the slices they need.
922
923/// Coarse classifier for tx-control statements. Spotted by string
924/// match before `sqlparser` runs, just like the PRAGMA intercept.
925/// Distinguishing `BEGIN CONCURRENT` from plain `BEGIN` matters
926/// because plain `BEGIN` still routes through the legacy
927/// deep-clone snapshot path; only `BEGIN CONCURRENT` opens an
928/// MVCC transaction.
929#[derive(Debug, Clone, Copy, PartialEq, Eq)]
930enum ConcurrentTxIntent {
931 /// `BEGIN CONCURRENT` — opens an MVCC transaction.
932 Begin,
933 /// `COMMIT` (with optional `TRANSACTION` / `WORK` / `;`).
934 Commit,
935 /// `ROLLBACK` (with optional `TRANSACTION` / `WORK` / `;`).
936 Rollback,
937 /// Anything else — falls through to the regular dispatch.
938 None,
939}
940
941/// Coarse classifier for legacy tx-control statements (used to
942/// reject nested `BEGIN` inside an open `BEGIN CONCURRENT`).
943#[derive(Debug, Clone, Copy, PartialEq, Eq)]
944enum LegacyTxIntent {
945 /// Plain `BEGIN` / `BEGIN TRANSACTION` / `BEGIN DEFERRED` etc.
946 /// — every shape that *isn't* `BEGIN CONCURRENT`.
947 Begin,
948 /// Anything else.
949 None,
950}
951
952fn concurrent_tx_intent(sql: &str) -> ConcurrentTxIntent {
953 let tokens = lowercase_tokens(sql);
954 let head = tokens.as_slice();
955 match head {
956 [first, second, ..] if first == "begin" && second == "concurrent" => {
957 ConcurrentTxIntent::Begin
958 }
959 [first, ..] if first == "commit" => ConcurrentTxIntent::Commit,
960 [first, ..] if first == "end" => ConcurrentTxIntent::Commit,
961 [first, ..] if first == "rollback" => ConcurrentTxIntent::Rollback,
962 _ => ConcurrentTxIntent::None,
963 }
964}
965
966fn legacy_tx_intent(sql: &str) -> LegacyTxIntent {
967 let tokens = lowercase_tokens(sql);
968 let head = tokens.as_slice();
969 match head {
970 // Plain BEGIN — but not BEGIN CONCURRENT, which the
971 // concurrent-tx intent already caught.
972 [first, ..] if first == "begin" => {
973 if matches!(head.get(1).map(String::as_str), Some("concurrent")) {
974 LegacyTxIntent::None
975 } else {
976 LegacyTxIntent::Begin
977 }
978 }
979 [first, ..] if first == "start" => LegacyTxIntent::Begin,
980 _ => LegacyTxIntent::None,
981 }
982}
983
984/// Splits `sql` on whitespace + punctuation that's not part of
985/// keywords, lowercases each piece, and returns the resulting
986/// token list. Coarse enough to spot `BEGIN`, `COMMIT`,
987/// `ROLLBACK`, `CONCURRENT`, `TRANSACTION`, etc.; not a real
988/// tokenizer.
989fn lowercase_tokens(sql: &str) -> Vec<String> {
990 sql.split(|c: char| c.is_whitespace() || c == ';' || c == '(' || c == ')' || c == ',')
991 .filter(|t| !t.is_empty())
992 .map(|t| t.to_ascii_lowercase())
993 .collect()
994}
995
996/// Statement shapes that must be rejected inside a `BEGIN
997/// CONCURRENT` block. v0 covers the canonical DDL — CREATE
998/// TABLE, CREATE INDEX, DROP TABLE, DROP INDEX, ALTER TABLE,
999/// VACUUM. Cheap string-prefix check; misses contrived
1000/// formattings like a leading SQL comment, but the rejection is
1001/// best-effort and v0 doesn't promise schema isolation inside
1002/// the tx anyway.
1003fn rejects_in_concurrent_tx(sql: &str) -> bool {
1004 let trimmed = sql.trim_start();
1005 let lower = trimmed.to_ascii_lowercase();
1006 lower.starts_with("create ")
1007 || lower.starts_with("drop ")
1008 || lower.starts_with("alter ")
1009 || lower.starts_with("vacuum")
1010}
1011
1012/// Phase 11.4 commit-time helper — diff `live` (the original
1013/// `Database::tables` map) against `snapshot` (the
1014/// transaction's private clone, post-statements) and produce
1015/// the write-set: every `(RowID, VersionPayload)` whose value
1016/// in the snapshot differs from the live state.
1017///
1018/// Three cases:
1019///
1020/// - Row in snapshot but not in live → INSERT, payload =
1021/// [`VersionPayload::Present`] of snapshot's column-value
1022/// pairs.
1023/// - Row in both, with different column values → UPDATE, same
1024/// shape.
1025/// - Row in live but not in snapshot → DELETE, payload =
1026/// [`VersionPayload::Tombstone`].
1027///
1028/// Errors only if the snapshot's table set drifted from the
1029/// live database (DDL was rejected at execute-time so this
1030/// shouldn't fire; the typed error guards against bugs).
1031fn diff_tables_for_writes(
1032 live: &HashMap<String, Table>,
1033 snapshot: &HashMap<String, Table>,
1034) -> Result<Vec<(RowID, VersionPayload)>> {
1035 let mut writes: Vec<(RowID, VersionPayload)> = Vec::new();
1036 for (name, snap_table) in snapshot {
1037 let live_table = live.get(name).ok_or_else(|| {
1038 SQLRiteError::Internal(format!(
1039 "concurrent commit: table '{name}' missing from live database"
1040 ))
1041 })?;
1042 let live_rowids: std::collections::HashSet<i64> = live_table.rowids().into_iter().collect();
1043 let snap_rowids = snap_table.rowids();
1044 for rowid in &snap_rowids {
1045 let snap_payload = build_payload(snap_table, *rowid);
1046 if live_rowids.contains(rowid) {
1047 let live_payload = build_payload(live_table, *rowid);
1048 if live_payload != snap_payload {
1049 writes.push((RowID::new(name, *rowid), snap_payload));
1050 }
1051 } else {
1052 writes.push((RowID::new(name, *rowid), snap_payload));
1053 }
1054 }
1055 let snap_set: std::collections::HashSet<i64> = snap_rowids.into_iter().collect();
1056 for rowid in live_table.rowids() {
1057 if !snap_set.contains(&rowid) {
1058 writes.push((RowID::new(name, rowid), VersionPayload::Tombstone));
1059 }
1060 }
1061 }
1062 Ok(writes)
1063}
1064
1065/// Builds a [`VersionPayload::Present`] from a row's column-value
1066/// pairs. Column order is the table's declaration order; missing
1067/// values surface as [`Value::Null`].
1068fn build_payload(table: &Table, rowid: i64) -> VersionPayload {
1069 let cols = table.column_names();
1070 let vals = table.extract_row(rowid);
1071 let pairs: Vec<(String, Value)> = cols
1072 .into_iter()
1073 .zip(vals)
1074 .map(|(c, v)| (c, v.unwrap_or(Value::Null)))
1075 .collect();
1076 VersionPayload::Present(pairs)
1077}
1078
1079/// Applies the commit's write-set onto the live database
1080/// row-by-row. Each `(RowID, payload)` translates into a
1081/// `delete_row` (always — clears column data and any
1082/// secondary-index entries that reference the row) followed
1083/// by a `restore_row` if the payload is `Present`.
1084///
1085/// Per-row apply rather than wholesale table-replace because
1086/// other concurrent transactions may have committed onto the
1087/// live database between our BEGIN and our COMMIT — replacing
1088/// the whole table would silently undo their disjoint writes.
1089/// The validation pass already proved we have no row-level
1090/// conflict with those commits, so writing only our own rows
1091/// preserves theirs.
1092///
1093/// The `_snapshot` parameter is unused today but kept on the
1094/// signature so the FTS / HNSW maintenance pass can grow into
1095/// it in a follow-up (the snapshot has the secondary-index
1096/// state the executor built during the tx; the live table
1097/// will need the same updates if that index is on a touched
1098/// column).
1099fn apply_writes_to_live(
1100 db: &mut Database,
1101 _snapshot: &HashMap<String, Table>,
1102 writes: &[(RowID, VersionPayload)],
1103) -> Result<()> {
1104 for (row_id, payload) in writes {
1105 let live_table = db.tables.get_mut(&row_id.table).ok_or_else(|| {
1106 SQLRiteError::Internal(format!(
1107 "concurrent commit: table '{}' missing from live database",
1108 row_id.table
1109 ))
1110 })?;
1111 // Always remove the existing row first — this clears the
1112 // per-column storage and the secondary-index entries that
1113 // reference it. INSERT (no existing row) is a no-op
1114 // delete; UPDATE turns into delete-then-insert; DELETE is
1115 // just delete.
1116 live_table.delete_row(row_id.rowid);
1117 if let VersionPayload::Present(cols) = payload {
1118 // The payload's column order matches the table's
1119 // declaration order (build_payload uses
1120 // column_names() and extract_row(), both of which
1121 // walk in declaration order). Map back into the
1122 // `Vec<Option<Value>>` shape `restore_row` expects.
1123 let values: Vec<Option<Value>> = cols
1124 .iter()
1125 .map(|(_col, value)| match value {
1126 Value::Null => None,
1127 other => Some(other.clone()),
1128 })
1129 .collect();
1130 live_table.restore_row(row_id.rowid, values).map_err(|e| {
1131 SQLRiteError::Internal(format!(
1132 "concurrent commit: restore_row({}) on table '{}' failed: {e}",
1133 row_id.rowid, row_id.table,
1134 ))
1135 })?;
1136 }
1137 }
1138 Ok(())
1139}
1140
1141/// SQLR-23 — the parse-once-execute-many representation. Built by
1142/// `CachedPlan::compile` (sqlparser walk + placeholder rewriting +
1143/// SELECT narrowing) and shared between every `Statement` that hits
1144/// the same SQL string in `prepare_cached`.
1145#[derive(Debug)]
1146struct CachedPlan {
1147 /// Original SQL — kept for diagnostic output.
1148 #[allow(dead_code)]
1149 sql: String,
1150 /// AST after `?` → `?N` placeholder rewriting. Cloned per execute
1151 /// so the substitution pass leaves the cached copy intact.
1152 ast: AstStatement,
1153 /// Total `?` placeholder count in the source SQL. Strict bind
1154 /// validation in `query_with_params` / `execute_with_params`
1155 /// uses this.
1156 param_count: usize,
1157 /// SELECT narrowing — cached so `query()` doesn't redo the
1158 /// `SelectQuery::new` walk for unbound SELECTs. `None` for
1159 /// non-SELECT statements.
1160 select: Option<SelectQuery>,
1161}
1162
1163impl CachedPlan {
1164 fn compile(sql: &str) -> Result<Self> {
1165 let dialect = SqlriteDialect::new();
1166 let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1167 let Some(mut stmt) = ast.pop() else {
1168 return Err(SQLRiteError::General("no statement to prepare".to_string()));
1169 };
1170 if !ast.is_empty() {
1171 return Err(SQLRiteError::General(
1172 "prepare() accepts a single statement; found more than one".to_string(),
1173 ));
1174 }
1175 let param_count = rewrite_placeholders(&mut stmt);
1176 let select = match &stmt {
1177 AstStatement::Query(_) => Some(SelectQuery::new(&stmt)?),
1178 _ => None,
1179 };
1180 Ok(Self {
1181 sql: sql.to_string(),
1182 ast: stmt,
1183 param_count,
1184 select,
1185 })
1186 }
1187}
1188
1189/// A prepared statement bound to a specific connection lifetime.
1190///
1191/// SQLR-23 — `Statement` carries the parsed AST (parsed exactly once
1192/// at prepare time), not just the raw SQL. `query` / `run` execute
1193/// against the cached AST; `query_with_params` / `execute_with_params`
1194/// clone the AST and substitute `?` placeholders before dispatch.
1195pub struct Statement<'c> {
1196 conn: &'c mut Connection,
1197 plan: Arc<CachedPlan>,
1198}
1199
1200impl std::fmt::Debug for Statement<'_> {
1201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1202 f.debug_struct("Statement")
1203 .field("sql", &self.plan.sql)
1204 .field("param_count", &self.plan.param_count)
1205 .field(
1206 "kind",
1207 &match self.plan.select {
1208 Some(_) => "Select",
1209 None => "Other",
1210 },
1211 )
1212 .finish()
1213 }
1214}
1215
1216impl<'c> Statement<'c> {
1217 /// Number of `?` placeholders detected in the source SQL. Strict
1218 /// arity validation: passing a slice of a different length to
1219 /// `query_with_params` / `execute_with_params` returns a typed
1220 /// error.
1221 pub fn parameter_count(&self) -> usize {
1222 self.plan.param_count
1223 }
1224
1225 /// Executes a prepared non-query statement. Equivalent to
1226 /// [`Connection::execute`] — included for parity with the
1227 /// typed-row `query()` so callers who want `Statement::run` /
1228 /// `Statement::query` symmetry get it.
1229 ///
1230 /// Errors if the prepared SQL contains `?` placeholders — use
1231 /// [`Statement::execute_with_params`] for those.
1232 pub fn run(&mut self) -> Result<String> {
1233 if self.plan.param_count > 0 {
1234 return Err(SQLRiteError::General(format!(
1235 "statement has {} `?` placeholder(s); call execute_with_params()",
1236 self.plan.param_count
1237 )));
1238 }
1239 let ast = self.plan.ast.clone();
1240 let mut db = self.conn.lock();
1241 process_ast_with_render(ast, &mut db).map(|o| o.status)
1242 }
1243
1244 /// SQLR-23 — executes a prepared non-SELECT statement after binding
1245 /// `?` placeholders to `params` (positional, in source order).
1246 ///
1247 /// Use this for parameterized INSERT / UPDATE / DELETE — the
1248 /// substitution clones the cached AST, fills in the `?` slots
1249 /// from `params`, and dispatches without re-running sqlparser.
1250 /// For SELECT, prefer [`Statement::query_with_params`].
1251 pub fn execute_with_params(&mut self, params: &[Value]) -> Result<String> {
1252 self.check_arity(params)?;
1253 let mut ast = self.plan.ast.clone();
1254 if !params.is_empty() {
1255 substitute_params(&mut ast, params)?;
1256 }
1257 let mut db = self.conn.lock();
1258 process_ast_with_render(ast, &mut db).map(|o| o.status)
1259 }
1260
1261 /// Runs a SELECT and returns a [`Rows`] iterator over typed rows.
1262 /// Errors if the prepared statement isn't a SELECT.
1263 ///
1264 /// SQLR-23 — uses the SELECT narrowing cached at prepare time;
1265 /// no per-call sqlparser walk. Errors if the prepared SQL
1266 /// contains `?` placeholders — use [`Statement::query_with_params`]
1267 /// for those.
1268 pub fn query(&self) -> Result<Rows> {
1269 if self.plan.param_count > 0 {
1270 return Err(SQLRiteError::General(format!(
1271 "statement has {} `?` placeholder(s); call query_with_params()",
1272 self.plan.param_count
1273 )));
1274 }
1275 let Some(sq) = self.plan.select.as_ref() else {
1276 return Err(SQLRiteError::General(
1277 "query() only works on SELECT statements; use run() for DDL/DML".to_string(),
1278 ));
1279 };
1280 // Phase 11.5 — when a `BEGIN CONCURRENT` is open on this
1281 // connection, the read sees the transaction's BEGIN-time
1282 // snapshot, not the post-commit live database. The
1283 // helper handles the swap (and the no-op fallback for
1284 // the common case where no concurrent tx is open).
1285 let result = self
1286 .conn
1287 .with_snapshot_read(|db| execute_select_rows(sq.clone(), db))?;
1288 Ok(Rows {
1289 columns: result.columns,
1290 rows: result.rows.into_iter(),
1291 })
1292 }
1293
1294 /// SQLR-23 — runs a SELECT and returns a [`Rows`] iterator after
1295 /// binding `?` placeholders to `params`. Positional, source-order
1296 /// indexing — `params[0]` is `?1`, `params[1]` is `?2`, etc.
1297 ///
1298 /// Vector parameters (`Value::Vector(...)`) substitute as the
1299 /// in-band bracket-array shape the executor recognizes, so a
1300 /// bound query vector still triggers the HNSW probe optimizer
1301 /// (Phase 7d.2 KNN shortcut).
1302 pub fn query_with_params(&self, params: &[Value]) -> Result<Rows> {
1303 self.check_arity(params)?;
1304 if self.plan.select.is_none() {
1305 return Err(SQLRiteError::General(
1306 "query_with_params() only works on SELECT statements; use execute_with_params() \
1307 for DDL/DML"
1308 .to_string(),
1309 ));
1310 }
1311 // Re-narrow against the substituted AST. The narrow walk is
1312 // cheap (it pulls projection/WHERE/ORDER BY into typed
1313 // structs), and rerunning it ensures the substituted literals
1314 // (e.g. a bracket-array vector) flow through `SelectQuery`.
1315 let mut ast = self.plan.ast.clone();
1316 if !params.is_empty() {
1317 substitute_params(&mut ast, params)?;
1318 }
1319 let sq = SelectQuery::new(&ast)?;
1320 // Phase 11.5 — same snapshot-read path as `query()`, just
1321 // running on the substituted SelectQuery rather than the
1322 // cached one.
1323 let result = self
1324 .conn
1325 .with_snapshot_read(|db| execute_select_rows(sq, db))?;
1326 Ok(Rows {
1327 columns: result.columns,
1328 rows: result.rows.into_iter(),
1329 })
1330 }
1331
1332 fn check_arity(&self, params: &[Value]) -> Result<()> {
1333 if params.len() != self.plan.param_count {
1334 return Err(SQLRiteError::General(format!(
1335 "expected {} parameter{}, got {}",
1336 self.plan.param_count,
1337 if self.plan.param_count == 1 { "" } else { "s" },
1338 params.len()
1339 )));
1340 }
1341 Ok(())
1342 }
1343
1344 /// Column names this statement will produce, in projection order.
1345 /// `None` for non-SELECT statements.
1346 pub fn column_names(&self) -> Option<Vec<String>> {
1347 match &self.plan.select {
1348 Some(_) => {
1349 // We can't know the concrete column list without
1350 // running the query (it depends on the table schema
1351 // and the projection). Callers who need it up front
1352 // should call query() and inspect Rows::columns.
1353 None
1354 }
1355 None => None,
1356 }
1357 }
1358}
1359
1360/// Iterator of typed [`Row`] values produced by a `SELECT` query.
1361///
1362/// Today `Rows` is backed by an eager `Vec<Vec<Value>>` — the cursor
1363/// abstraction in Phase 5a's follow-up will swap this for a lazy
1364/// walker that streams rows off the B-Tree without materializing
1365/// them upfront. The `Rows::next` API is designed for that: it
1366/// returns `Result<Option<Row>>` rather than `Option<Result<Row>>`,
1367/// so a mid-stream I/O error surfaces cleanly.
1368pub struct Rows {
1369 columns: Vec<String>,
1370 rows: std::vec::IntoIter<Vec<Value>>,
1371}
1372
1373impl std::fmt::Debug for Rows {
1374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1375 f.debug_struct("Rows")
1376 .field("columns", &self.columns)
1377 .field("remaining", &self.rows.len())
1378 .finish()
1379 }
1380}
1381
1382impl Rows {
1383 /// Column names in projection order.
1384 pub fn columns(&self) -> &[String] {
1385 &self.columns
1386 }
1387
1388 /// Advances to the next row. Returns `Ok(None)` when the query is
1389 /// exhausted, `Ok(Some(row))` otherwise, `Err(_)` on an I/O or
1390 /// decode failure (relevant once Phase 5a's cursor work lands —
1391 /// today this is always `Ok(_)`).
1392 pub fn next(&mut self) -> Result<Option<Row<'_>>> {
1393 Ok(self.rows.next().map(|values| Row {
1394 columns: &self.columns,
1395 values,
1396 }))
1397 }
1398
1399 /// Collects every remaining row into a `Vec<Row>`. Convenient for
1400 /// small result sets; avoid on large queries — that's what the
1401 /// streaming [`Rows::next`] API is for.
1402 pub fn collect_all(mut self) -> Result<Vec<OwnedRow>> {
1403 let mut out = Vec::new();
1404 while let Some(r) = self.next()? {
1405 out.push(r.to_owned_row());
1406 }
1407 Ok(out)
1408 }
1409}
1410
1411/// A single row borrowed from a [`Rows`] iterator. Lives only as long
1412/// as the iterator; call `Row::to_owned_row` to detach it if you need
1413/// to keep it past the next `next()` call.
1414pub struct Row<'r> {
1415 columns: &'r [String],
1416 values: Vec<Value>,
1417}
1418
1419impl<'r> Row<'r> {
1420 /// Value at column index `idx`. Returns a clean error if out of
1421 /// bounds or the type conversion fails.
1422 pub fn get<T: FromValue>(&self, idx: usize) -> Result<T> {
1423 let v = self.values.get(idx).ok_or_else(|| {
1424 SQLRiteError::General(format!(
1425 "column index {idx} out of bounds (row has {} columns)",
1426 self.values.len()
1427 ))
1428 })?;
1429 T::from_value(v)
1430 }
1431
1432 /// Value at column named `name`. Case-sensitive.
1433 pub fn get_by_name<T: FromValue>(&self, name: &str) -> Result<T> {
1434 let idx = self
1435 .columns
1436 .iter()
1437 .position(|c| c == name)
1438 .ok_or_else(|| SQLRiteError::General(format!("no column named '{name}' in row")))?;
1439 self.get(idx)
1440 }
1441
1442 /// Column names for this row.
1443 pub fn columns(&self) -> &[String] {
1444 self.columns
1445 }
1446
1447 /// Detaches from the parent `Rows` iterator. Useful when you want
1448 /// to keep rows past the next `Rows::next()` call.
1449 pub fn to_owned_row(&self) -> OwnedRow {
1450 OwnedRow {
1451 columns: self.columns.to_vec(),
1452 values: self.values.clone(),
1453 }
1454 }
1455}
1456
1457/// A row detached from the `Rows` iterator — owns its data, no
1458/// borrow ties it to the parent iterator.
1459#[derive(Debug, Clone)]
1460pub struct OwnedRow {
1461 pub columns: Vec<String>,
1462 pub values: Vec<Value>,
1463}
1464
1465impl OwnedRow {
1466 pub fn get<T: FromValue>(&self, idx: usize) -> Result<T> {
1467 let v = self.values.get(idx).ok_or_else(|| {
1468 SQLRiteError::General(format!(
1469 "column index {idx} out of bounds (row has {} columns)",
1470 self.values.len()
1471 ))
1472 })?;
1473 T::from_value(v)
1474 }
1475
1476 pub fn get_by_name<T: FromValue>(&self, name: &str) -> Result<T> {
1477 let idx = self
1478 .columns
1479 .iter()
1480 .position(|c| c == name)
1481 .ok_or_else(|| SQLRiteError::General(format!("no column named '{name}' in row")))?;
1482 self.get(idx)
1483 }
1484}
1485
1486/// Conversion from SQLRite's internal [`Value`] enum into a typed Rust
1487/// value. Implementations cover the common built-ins — `i64`, `f64`,
1488/// `String`, `bool`, and `Option<T>` for nullable columns. Extend on
1489/// demand.
1490pub trait FromValue: Sized {
1491 fn from_value(v: &Value) -> Result<Self>;
1492}
1493
1494impl FromValue for i64 {
1495 fn from_value(v: &Value) -> Result<Self> {
1496 match v {
1497 Value::Integer(n) => Ok(*n),
1498 Value::Null => Err(SQLRiteError::General(
1499 "expected Integer, got NULL".to_string(),
1500 )),
1501 other => Err(SQLRiteError::General(format!(
1502 "cannot convert {other:?} to i64"
1503 ))),
1504 }
1505 }
1506}
1507
1508impl FromValue for f64 {
1509 fn from_value(v: &Value) -> Result<Self> {
1510 match v {
1511 Value::Real(f) => Ok(*f),
1512 Value::Integer(n) => Ok(*n as f64),
1513 Value::Null => Err(SQLRiteError::General("expected Real, got NULL".to_string())),
1514 other => Err(SQLRiteError::General(format!(
1515 "cannot convert {other:?} to f64"
1516 ))),
1517 }
1518 }
1519}
1520
1521impl FromValue for String {
1522 fn from_value(v: &Value) -> Result<Self> {
1523 match v {
1524 Value::Text(s) => Ok(s.clone()),
1525 Value::Null => Err(SQLRiteError::General("expected Text, got NULL".to_string())),
1526 other => Err(SQLRiteError::General(format!(
1527 "cannot convert {other:?} to String"
1528 ))),
1529 }
1530 }
1531}
1532
1533impl FromValue for bool {
1534 fn from_value(v: &Value) -> Result<Self> {
1535 match v {
1536 Value::Bool(b) => Ok(*b),
1537 Value::Integer(n) => Ok(*n != 0),
1538 Value::Null => Err(SQLRiteError::General("expected Bool, got NULL".to_string())),
1539 other => Err(SQLRiteError::General(format!(
1540 "cannot convert {other:?} to bool"
1541 ))),
1542 }
1543 }
1544}
1545
1546/// Nullable columns: `Option<T>` maps `NULL → None` and everything else
1547/// through the inner type's `FromValue` impl.
1548impl<T: FromValue> FromValue for Option<T> {
1549 fn from_value(v: &Value) -> Result<Self> {
1550 match v {
1551 Value::Null => Ok(None),
1552 other => Ok(Some(T::from_value(other)?)),
1553 }
1554 }
1555}
1556
1557/// Identity impl so `row.get::<_, Value>(0)` works when you want
1558/// untyped access.
1559impl FromValue for Value {
1560 fn from_value(v: &Value) -> Result<Self> {
1561 Ok(v.clone())
1562 }
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567 use super::*;
1568
1569 fn tmp_path(name: &str) -> std::path::PathBuf {
1570 let mut p = std::env::temp_dir();
1571 let pid = std::process::id();
1572 let nanos = std::time::SystemTime::now()
1573 .duration_since(std::time::UNIX_EPOCH)
1574 .map(|d| d.as_nanos())
1575 .unwrap_or(0);
1576 p.push(format!("sqlrite-conn-{pid}-{nanos}-{name}.sqlrite"));
1577 p
1578 }
1579
1580 fn cleanup(path: &std::path::Path) {
1581 let _ = std::fs::remove_file(path);
1582 let mut wal = path.as_os_str().to_owned();
1583 wal.push("-wal");
1584 let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1585 }
1586
1587 #[test]
1588 fn in_memory_roundtrip() {
1589 let mut conn = Connection::open_in_memory().unwrap();
1590 conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER);")
1591 .unwrap();
1592 conn.execute("INSERT INTO users (name, age) VALUES ('alice', 30);")
1593 .unwrap();
1594 conn.execute("INSERT INTO users (name, age) VALUES ('bob', 25);")
1595 .unwrap();
1596
1597 let stmt = conn.prepare("SELECT id, name, age FROM users;").unwrap();
1598 let mut rows = stmt.query().unwrap();
1599 assert_eq!(rows.columns(), &["id", "name", "age"]);
1600 let mut collected: Vec<(i64, String, i64)> = Vec::new();
1601 while let Some(row) = rows.next().unwrap() {
1602 collected.push((
1603 row.get::<i64>(0).unwrap(),
1604 row.get::<String>(1).unwrap(),
1605 row.get::<i64>(2).unwrap(),
1606 ));
1607 }
1608 assert_eq!(collected.len(), 2);
1609 assert!(collected.iter().any(|(_, n, a)| n == "alice" && *a == 30));
1610 assert!(collected.iter().any(|(_, n, a)| n == "bob" && *a == 25));
1611 }
1612
1613 #[test]
1614 fn file_backed_persists_across_connections() {
1615 let path = tmp_path("persist");
1616 {
1617 let mut c1 = Connection::open(&path).unwrap();
1618 c1.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, label TEXT);")
1619 .unwrap();
1620 c1.execute("INSERT INTO items (label) VALUES ('one');")
1621 .unwrap();
1622 }
1623 {
1624 let mut c2 = Connection::open(&path).unwrap();
1625 let stmt = c2.prepare("SELECT label FROM items;").unwrap();
1626 let mut rows = stmt.query().unwrap();
1627 let first = rows.next().unwrap().expect("one row");
1628 assert_eq!(first.get::<String>(0).unwrap(), "one");
1629 assert!(rows.next().unwrap().is_none());
1630 }
1631 cleanup(&path);
1632 }
1633
1634 #[test]
1635 fn read_only_connection_rejects_writes() {
1636 let path = tmp_path("ro_reject");
1637 {
1638 let mut c = Connection::open(&path).unwrap();
1639 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
1640 .unwrap();
1641 c.execute("INSERT INTO t (id) VALUES (1);").unwrap();
1642 } // writer drops → releases exclusive lock
1643
1644 let mut ro = Connection::open_read_only(&path).unwrap();
1645 assert!(ro.is_read_only());
1646 let err = ro.execute("INSERT INTO t (id) VALUES (2);").unwrap_err();
1647 assert!(format!("{err}").contains("read-only"));
1648 cleanup(&path);
1649 }
1650
1651 #[test]
1652 fn transactions_work_through_connection() {
1653 let mut conn = Connection::open_in_memory().unwrap();
1654 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, x INTEGER);")
1655 .unwrap();
1656 conn.execute("INSERT INTO t (x) VALUES (1);").unwrap();
1657
1658 conn.execute("BEGIN;").unwrap();
1659 assert!(conn.in_transaction());
1660 conn.execute("INSERT INTO t (x) VALUES (2);").unwrap();
1661 conn.execute("ROLLBACK;").unwrap();
1662 assert!(!conn.in_transaction());
1663
1664 let stmt = conn.prepare("SELECT x FROM t;").unwrap();
1665 let rows = stmt.query().unwrap().collect_all().unwrap();
1666 assert_eq!(rows.len(), 1);
1667 assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1668 }
1669
1670 #[test]
1671 fn get_by_name_works() {
1672 let mut conn = Connection::open_in_memory().unwrap();
1673 conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1674 conn.execute("INSERT INTO t (a, b) VALUES (42, 'hello');")
1675 .unwrap();
1676
1677 let stmt = conn.prepare("SELECT a, b FROM t;").unwrap();
1678 let mut rows = stmt.query().unwrap();
1679 let row = rows.next().unwrap().unwrap();
1680 assert_eq!(row.get_by_name::<i64>("a").unwrap(), 42);
1681 assert_eq!(row.get_by_name::<String>("b").unwrap(), "hello");
1682 }
1683
1684 #[test]
1685 fn null_column_maps_to_none() {
1686 let mut conn = Connection::open_in_memory().unwrap();
1687 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, note TEXT);")
1688 .unwrap();
1689 // id INTEGER PRIMARY KEY autoincrements; `note` is left unspecified.
1690 conn.execute("INSERT INTO t (id) VALUES (1);").unwrap();
1691
1692 let stmt = conn.prepare("SELECT id, note FROM t;").unwrap();
1693 let mut rows = stmt.query().unwrap();
1694 let row = rows.next().unwrap().unwrap();
1695 assert_eq!(row.get::<i64>(0).unwrap(), 1);
1696 // note is NULL → Option<String> resolves to None.
1697 assert_eq!(row.get::<Option<String>>(1).unwrap(), None);
1698 }
1699
1700 #[test]
1701 fn prepare_rejects_multiple_statements() {
1702 let mut conn = Connection::open_in_memory().unwrap();
1703 let err = conn.prepare("SELECT 1; SELECT 2;").unwrap_err();
1704 assert!(format!("{err}").contains("single statement"));
1705 }
1706
1707 #[test]
1708 fn query_on_non_select_errors() {
1709 let mut conn = Connection::open_in_memory().unwrap();
1710 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
1711 .unwrap();
1712 let stmt = conn.prepare("INSERT INTO t VALUES (1);").unwrap();
1713 let err = stmt.query().unwrap_err();
1714 assert!(format!("{err}").contains("SELECT"));
1715 }
1716
1717 /// SQLR-10: fresh connections expose the SQLite-parity 25% default,
1718 /// the setter validates its input, and `None` opts out cleanly.
1719 #[test]
1720 fn auto_vacuum_threshold_default_and_setter() {
1721 let mut conn = Connection::open_in_memory().unwrap();
1722 assert_eq!(
1723 conn.auto_vacuum_threshold(),
1724 Some(0.25),
1725 "fresh connection should ship with the SQLite-parity default"
1726 );
1727
1728 conn.set_auto_vacuum_threshold(None).unwrap();
1729 assert_eq!(conn.auto_vacuum_threshold(), None);
1730
1731 conn.set_auto_vacuum_threshold(Some(0.5)).unwrap();
1732 assert_eq!(conn.auto_vacuum_threshold(), Some(0.5));
1733
1734 // Out-of-range values must be rejected with a typed error and
1735 // must not stomp the previously-set value.
1736 let err = conn.set_auto_vacuum_threshold(Some(1.5)).unwrap_err();
1737 assert!(
1738 format!("{err}").contains("auto_vacuum_threshold"),
1739 "expected typed range error, got: {err}"
1740 );
1741 assert_eq!(
1742 conn.auto_vacuum_threshold(),
1743 Some(0.5),
1744 "rejected setter call must not mutate the threshold"
1745 );
1746 }
1747
1748 #[test]
1749 fn index_out_of_bounds_errors_cleanly() {
1750 let mut conn = Connection::open_in_memory().unwrap();
1751 conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY);")
1752 .unwrap();
1753 conn.execute("INSERT INTO t (a) VALUES (1);").unwrap();
1754 let stmt = conn.prepare("SELECT a FROM t;").unwrap();
1755 let mut rows = stmt.query().unwrap();
1756 let row = rows.next().unwrap().unwrap();
1757 let err = row.get::<i64>(99).unwrap_err();
1758 assert!(format!("{err}").contains("out of bounds"));
1759 }
1760
1761 // -----------------------------------------------------------------
1762 // SQLR-23 — prepared-statement plan cache + parameter binding
1763 // -----------------------------------------------------------------
1764
1765 #[test]
1766 fn parameter_count_reflects_question_marks() {
1767 let mut conn = Connection::open_in_memory().unwrap();
1768 conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1769 let stmt = conn.prepare("SELECT a, b FROM t WHERE a = ?").unwrap();
1770 assert_eq!(stmt.parameter_count(), 1);
1771 let stmt = conn
1772 .prepare("SELECT a, b FROM t WHERE a = ? AND b = ?")
1773 .unwrap();
1774 assert_eq!(stmt.parameter_count(), 2);
1775 let stmt = conn.prepare("SELECT a FROM t").unwrap();
1776 assert_eq!(stmt.parameter_count(), 0);
1777 }
1778
1779 #[test]
1780 fn query_with_params_binds_scalars() {
1781 let mut conn = Connection::open_in_memory().unwrap();
1782 conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
1783 .unwrap();
1784 conn.execute("INSERT INTO t (a, b) VALUES (1, 'alice');")
1785 .unwrap();
1786 conn.execute("INSERT INTO t (a, b) VALUES (2, 'bob');")
1787 .unwrap();
1788 conn.execute("INSERT INTO t (a, b) VALUES (3, 'carol');")
1789 .unwrap();
1790
1791 let stmt = conn.prepare("SELECT b FROM t WHERE a = ?").unwrap();
1792 let rows = stmt
1793 .query_with_params(&[Value::Integer(2)])
1794 .unwrap()
1795 .collect_all()
1796 .unwrap();
1797 assert_eq!(rows.len(), 1);
1798 assert_eq!(rows[0].get::<String>(0).unwrap(), "bob");
1799 }
1800
1801 #[test]
1802 fn execute_with_params_binds_insert_values() {
1803 let mut conn = Connection::open_in_memory().unwrap();
1804 conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1805
1806 let mut stmt = conn.prepare("INSERT INTO t (a, b) VALUES (?, ?)").unwrap();
1807 stmt.execute_with_params(&[Value::Integer(7), Value::Text("hi".into())])
1808 .unwrap();
1809 stmt.execute_with_params(&[Value::Integer(8), Value::Text("yo".into())])
1810 .unwrap();
1811
1812 let stmt = conn.prepare("SELECT a, b FROM t").unwrap();
1813 let rows = stmt.query().unwrap().collect_all().unwrap();
1814 assert_eq!(rows.len(), 2);
1815 assert!(
1816 rows.iter()
1817 .any(|r| r.get::<i64>(0).unwrap() == 7 && r.get::<String>(1).unwrap() == "hi")
1818 );
1819 assert!(
1820 rows.iter()
1821 .any(|r| r.get::<i64>(0).unwrap() == 8 && r.get::<String>(1).unwrap() == "yo")
1822 );
1823 }
1824
1825 #[test]
1826 fn arity_mismatch_returns_clean_error() {
1827 let mut conn = Connection::open_in_memory().unwrap();
1828 conn.execute("CREATE TABLE t (a INTEGER, b TEXT);").unwrap();
1829 let stmt = conn
1830 .prepare("SELECT * FROM t WHERE a = ? AND b = ?")
1831 .unwrap();
1832 let err = stmt.query_with_params(&[Value::Integer(1)]).unwrap_err();
1833 assert!(format!("{err}").contains("expected 2 parameter"));
1834 }
1835
1836 #[test]
1837 fn run_and_query_reject_when_placeholders_present() {
1838 let mut conn = Connection::open_in_memory().unwrap();
1839 conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1840 let mut stmt_select = conn.prepare("SELECT a FROM t WHERE a = ?").unwrap();
1841 let err = stmt_select.query().unwrap_err();
1842 assert!(format!("{err}").contains("query_with_params"));
1843 let err = stmt_select.run().unwrap_err();
1844 assert!(format!("{err}").contains("execute_with_params"));
1845 }
1846
1847 #[test]
1848 fn null_param_compares_against_null() {
1849 // a = NULL is *false* in SQL three-valued logic; binding NULL
1850 // must match SQLite's behavior so callers can rely on the same
1851 // semantics.
1852 let mut conn = Connection::open_in_memory().unwrap();
1853 conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1854 conn.execute("INSERT INTO t (a) VALUES (1);").unwrap();
1855 let stmt = conn.prepare("SELECT a FROM t WHERE a = ?").unwrap();
1856 let rows = stmt
1857 .query_with_params(&[Value::Null])
1858 .unwrap()
1859 .collect_all()
1860 .unwrap();
1861 assert_eq!(rows.len(), 0);
1862 }
1863
1864 #[test]
1865 fn vector_param_substitutes_through_select() {
1866 // Non-HNSW path: a small VECTOR table + brute-force ORDER BY
1867 // exercises the substitution into the ORDER BY expression
1868 // and the bracket-array shape eval_expr_scope expects.
1869 let mut conn = Connection::open_in_memory().unwrap();
1870 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(3));")
1871 .unwrap();
1872 conn.execute("INSERT INTO v (id, e) VALUES (1, [1.0, 0.0, 0.0]);")
1873 .unwrap();
1874 conn.execute("INSERT INTO v (id, e) VALUES (2, [0.0, 1.0, 0.0]);")
1875 .unwrap();
1876 conn.execute("INSERT INTO v (id, e) VALUES (3, [0.0, 0.0, 1.0]);")
1877 .unwrap();
1878
1879 let stmt = conn
1880 .prepare("SELECT id FROM v ORDER BY vec_distance_l2(e, ?) ASC LIMIT 1")
1881 .unwrap();
1882 let rows = stmt
1883 .query_with_params(&[Value::Vector(vec![1.0, 0.0, 0.0])])
1884 .unwrap()
1885 .collect_all()
1886 .unwrap();
1887 assert_eq!(rows.len(), 1);
1888 assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1889 }
1890
1891 #[test]
1892 fn prepare_cached_reuses_plans() {
1893 let mut conn = Connection::open_in_memory().unwrap();
1894 conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1895 for n in 1..=3 {
1896 conn.execute(&format!("INSERT INTO t (a) VALUES ({n});"))
1897 .unwrap();
1898 }
1899
1900 // First call populates the cache; second hits the same entry.
1901 let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1902 let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1903 assert_eq!(conn.prepared_cache_len(), 1);
1904
1905 // Distinct SQL widens the cache.
1906 let _ = conn.prepare_cached("SELECT a FROM t").unwrap();
1907 assert_eq!(conn.prepared_cache_len(), 2);
1908 }
1909
1910 #[test]
1911 fn prepare_cached_evicts_when_over_capacity() {
1912 let mut conn = Connection::open_in_memory().unwrap();
1913 conn.execute("CREATE TABLE t (a INTEGER);").unwrap();
1914 conn.set_prepared_cache_capacity(2);
1915 let _ = conn.prepare_cached("SELECT a FROM t").unwrap();
1916 let _ = conn.prepare_cached("SELECT a FROM t WHERE a = ?").unwrap();
1917 assert_eq!(conn.prepared_cache_len(), 2);
1918 // Third distinct SQL evicts the oldest entry (the FROM-only SELECT).
1919 let _ = conn.prepare_cached("SELECT a FROM t WHERE a > ?").unwrap();
1920 assert_eq!(conn.prepared_cache_len(), 2);
1921 }
1922
1923 /// SQLR-23 — the headline VECTOR-binding case. With an HNSW index
1924 /// attached, the optimizer hook recognizes
1925 /// `ORDER BY vec_distance_l2(col, ?) LIMIT k` even when the second
1926 /// arg is a bound parameter, because substitution lowers
1927 /// `Value::Vector` into the same bracket-array shape an inline
1928 /// `[…]` literal produces. Self-query: querying for one of the
1929 /// corpus's own vectors must return that vector as the nearest.
1930 #[test]
1931 fn vector_bind_through_hnsw_optimizer() {
1932 let mut conn = Connection::open_in_memory().unwrap();
1933 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(4));")
1934 .unwrap();
1935 let corpus: [(i64, [f32; 4]); 5] = [
1936 (1, [1.0, 0.0, 0.0, 0.0]),
1937 (2, [0.0, 1.0, 0.0, 0.0]),
1938 (3, [0.0, 0.0, 1.0, 0.0]),
1939 (4, [0.0, 0.0, 0.0, 1.0]),
1940 (5, [0.5, 0.5, 0.5, 0.5]),
1941 ];
1942 for (id, vec) in corpus {
1943 conn.execute(&format!(
1944 "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}, {}]);",
1945 vec[0], vec[1], vec[2], vec[3]
1946 ))
1947 .unwrap();
1948 }
1949 conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e);")
1950 .unwrap();
1951
1952 let stmt = conn
1953 .prepare("SELECT id FROM v ORDER BY vec_distance_l2(e, ?) ASC LIMIT 1")
1954 .unwrap();
1955 // Query with id=3's vector — expect id=3 back.
1956 let rows = stmt
1957 .query_with_params(&[Value::Vector(vec![0.0, 0.0, 1.0, 0.0])])
1958 .unwrap()
1959 .collect_all()
1960 .unwrap();
1961 assert_eq!(rows.len(), 1);
1962 assert_eq!(rows[0].get::<i64>(0).unwrap(), 3);
1963
1964 // Query with id=1's vector — expect id=1.
1965 let rows = stmt
1966 .query_with_params(&[Value::Vector(vec![1.0, 0.0, 0.0, 0.0])])
1967 .unwrap()
1968 .collect_all()
1969 .unwrap();
1970 assert_eq!(rows.len(), 1);
1971 assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
1972 }
1973
1974 /// SQLR-28 — cosine probe: an HNSW index built `WITH (metric =
1975 /// 'cosine')` must serve `ORDER BY vec_distance_cosine(col, [...])`
1976 /// from the graph. Self-query: querying for one of the corpus's
1977 /// own vectors must come back as the nearest under cosine
1978 /// distance.
1979 #[test]
1980 fn cosine_self_query_through_hnsw_optimizer() {
1981 let mut conn = Connection::open_in_memory().unwrap();
1982 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(4));")
1983 .unwrap();
1984 let corpus: [(i64, [f32; 4]); 5] = [
1985 (1, [1.0, 0.0, 0.0, 0.0]),
1986 (2, [0.0, 1.0, 0.0, 0.0]),
1987 (3, [0.0, 0.0, 1.0, 0.0]),
1988 (4, [0.0, 0.0, 0.0, 1.0]),
1989 (5, [0.5, 0.5, 0.5, 0.5]),
1990 ];
1991 for (id, vec) in corpus {
1992 conn.execute(&format!(
1993 "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}, {}]);",
1994 vec[0], vec[1], vec[2], vec[3]
1995 ))
1996 .unwrap();
1997 }
1998 conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e) WITH (metric = 'cosine');")
1999 .unwrap();
2000
2001 // Self-query for id=2's vector — expected nearest under cosine
2002 // distance is id=2 itself (cos distance 0).
2003 let rows = conn
2004 .prepare("SELECT id FROM v ORDER BY vec_distance_cosine(e, [0.0, 1.0, 0.0, 0.0]) ASC LIMIT 1")
2005 .unwrap()
2006 .query_with_params(&[])
2007 .unwrap()
2008 .collect_all()
2009 .unwrap();
2010 assert_eq!(rows.len(), 1);
2011 assert_eq!(rows[0].get::<i64>(0).unwrap(), 2);
2012 }
2013
2014 /// SQLR-28 — dot probe: same shape as the cosine test, but the
2015 /// index is built `WITH (metric = 'dot')` and the query uses
2016 /// `vec_distance_dot`. Confirms the third metric variant lights up
2017 /// the graph shortcut, not just l2 / cosine.
2018 #[test]
2019 fn dot_self_query_through_hnsw_optimizer() {
2020 let mut conn = Connection::open_in_memory().unwrap();
2021 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(3));")
2022 .unwrap();
2023 // Data: distinguishable magnitudes so the dot metric resolves
2024 // a clear winner. `vec_distance_dot(a, b) = -(a·b)` — smaller
2025 // (more negative) is closer.
2026 let corpus: [(i64, [f32; 3]); 4] = [
2027 (1, [1.0, 0.0, 0.0]),
2028 (2, [2.0, 0.0, 0.0]),
2029 (3, [0.0, 1.0, 0.0]),
2030 (4, [0.0, 0.0, 1.0]),
2031 ];
2032 for (id, vec) in corpus {
2033 conn.execute(&format!(
2034 "INSERT INTO v (id, e) VALUES ({id}, [{}, {}, {}]);",
2035 vec[0], vec[1], vec[2]
2036 ))
2037 .unwrap();
2038 }
2039 conn.execute("CREATE INDEX v_hnsw ON v USING hnsw (e) WITH (metric = 'dot');")
2040 .unwrap();
2041
2042 // Query [3, 0, 0]: dot products are 3, 6, 0, 0 → distances
2043 // -3, -6, 0, 0. id=2 has the smallest (most negative) distance.
2044 let rows = conn
2045 .prepare("SELECT id FROM v ORDER BY vec_distance_dot(e, [3.0, 0.0, 0.0]) ASC LIMIT 1")
2046 .unwrap()
2047 .query_with_params(&[])
2048 .unwrap()
2049 .collect_all()
2050 .unwrap();
2051 assert_eq!(rows.len(), 1);
2052 assert_eq!(rows[0].get::<i64>(0).unwrap(), 2);
2053 }
2054
2055 /// SQLR-28 — metric mismatch must NOT take the graph shortcut.
2056 /// An L2-built index queried with `vec_distance_cosine` falls
2057 /// through to brute-force, which still returns the correct
2058 /// answer. We confirm the answer is correct; the slow-path
2059 /// behaviour itself is implicit (no error, no panic, no wrong
2060 /// result), which is the user-visible contract that matters.
2061 #[test]
2062 fn metric_mismatch_falls_back_to_brute_force() {
2063 let mut conn = Connection::open_in_memory().unwrap();
2064 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(2));")
2065 .unwrap();
2066 let half_sqrt2 = std::f32::consts::FRAC_1_SQRT_2;
2067 let corpus: [(i64, [f32; 2]); 3] = [
2068 (1, [1.0, 0.0]),
2069 (2, [half_sqrt2, half_sqrt2]),
2070 (3, [0.0, 1.0]),
2071 ];
2072 for (id, vec) in corpus {
2073 conn.execute(&format!(
2074 "INSERT INTO v (id, e) VALUES ({id}, [{}, {}]);",
2075 vec[0], vec[1]
2076 ))
2077 .unwrap();
2078 }
2079 // Default L2 index — no WITH clause.
2080 conn.execute("CREATE INDEX v_hnsw_l2 ON v USING hnsw (e);")
2081 .unwrap();
2082
2083 // Query with cosine. Index can't help; brute-force still
2084 // returns the correct nearest by cosine: id=1 (cos dist 0).
2085 let rows = conn
2086 .prepare("SELECT id FROM v ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1")
2087 .unwrap()
2088 .query_with_params(&[])
2089 .unwrap()
2090 .collect_all()
2091 .unwrap();
2092 assert_eq!(rows.len(), 1);
2093 assert_eq!(rows[0].get::<i64>(0).unwrap(), 1);
2094 }
2095
2096 /// SQLR-28 — a typo in the metric name must error at CREATE INDEX
2097 /// time. Falling back to L2 silently is the bug we're fixing here,
2098 /// not the behaviour to preserve.
2099 #[test]
2100 fn unknown_metric_name_is_rejected() {
2101 let mut conn = Connection::open_in_memory().unwrap();
2102 conn.execute("CREATE TABLE v (id INTEGER PRIMARY KEY, e VECTOR(2));")
2103 .unwrap();
2104 let err = conn
2105 .execute("CREATE INDEX bad ON v USING hnsw (e) WITH (metric = 'cosin');")
2106 .unwrap_err();
2107 let msg = format!("{err}");
2108 assert!(msg.contains("unknown HNSW metric"), "got: {msg}");
2109 }
2110
2111 /// SQLR-28 — WITH options on a non-HNSW index must error rather
2112 /// than be silently ignored. An option that has no effect on the
2113 /// resulting index is a footgun.
2114 #[test]
2115 fn with_metric_on_btree_is_rejected() {
2116 let mut conn = Connection::open_in_memory().unwrap();
2117 conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
2118 .unwrap();
2119 let err = conn
2120 .execute("CREATE INDEX bad ON t (b) WITH (metric = 'cosine');")
2121 .unwrap_err();
2122 let msg = format!("{err}");
2123 assert!(msg.contains("doesn't support any options"), "got: {msg}");
2124 }
2125
2126 // -----------------------------------------------------------------
2127 // Phase 10.1 — multi-connection foundation
2128 // -----------------------------------------------------------------
2129
2130 /// `connect()` mints a sibling handle that shares the backing
2131 /// `Database`. Writes through one are visible through the other —
2132 /// the headline behavioural change for Phase 10.1.
2133 #[test]
2134 fn connect_shares_underlying_database() {
2135 let mut a = Connection::open_in_memory().unwrap();
2136 let mut b = a.connect();
2137 assert_eq!(a.handle_count(), 2);
2138
2139 a.execute("CREATE TABLE shared (id INTEGER PRIMARY KEY, label TEXT);")
2140 .unwrap();
2141 a.execute("INSERT INTO shared (label) VALUES ('via-a');")
2142 .unwrap();
2143 b.execute("INSERT INTO shared (label) VALUES ('via-b');")
2144 .unwrap();
2145
2146 let stmt = b.prepare("SELECT label FROM shared;").unwrap();
2147 let mut labels: Vec<String> = stmt
2148 .query()
2149 .unwrap()
2150 .collect_all()
2151 .unwrap()
2152 .into_iter()
2153 .map(|r| r.get::<String>(0).unwrap())
2154 .collect();
2155 labels.sort();
2156 assert_eq!(labels, vec!["via-a".to_string(), "via-b".to_string()]);
2157 }
2158
2159 /// Dropping a sibling decrements the handle count without
2160 /// disturbing the surviving connections.
2161 #[test]
2162 fn handle_count_reflects_live_handles() {
2163 let primary = Connection::open_in_memory().unwrap();
2164 assert_eq!(primary.handle_count(), 1);
2165 let s1 = primary.connect();
2166 let s2 = primary.connect();
2167 assert_eq!(primary.handle_count(), 3);
2168 drop(s1);
2169 assert_eq!(primary.handle_count(), 2);
2170 drop(s2);
2171 assert_eq!(primary.handle_count(), 1);
2172 }
2173
2174 /// Multi-thread INSERT/COMMIT against the same in-memory DB. Today
2175 /// the per-`Database` mutex serializes commits — this test proves
2176 /// the locking holds without panics or data loss when N threads
2177 /// race for the writer. Phase 10.4's `BEGIN CONCURRENT` will lift
2178 /// the serialization for disjoint-row workloads; until then the
2179 /// guarantee is "no panic, every commit lands."
2180 #[test]
2181 fn threaded_writers_serialize_cleanly() {
2182 use std::thread;
2183
2184 let primary = Connection::open_in_memory().unwrap();
2185 // Set up the shared schema before spawning so every worker
2186 // sees the table.
2187 {
2188 let mut p = primary.connect();
2189 p.execute("CREATE TABLE log (id INTEGER PRIMARY KEY, who TEXT, n INTEGER);")
2190 .unwrap();
2191 }
2192
2193 const THREADS: usize = 8;
2194 const PER_THREAD: usize = 25;
2195
2196 let handles: Vec<_> = (0..THREADS)
2197 .map(|tid| {
2198 let mut conn = primary.connect();
2199 thread::spawn(move || {
2200 for n in 0..PER_THREAD {
2201 let sql = format!("INSERT INTO log (who, n) VALUES ('t{tid}', {n});");
2202 conn.execute(&sql).expect("insert under contention");
2203 }
2204 })
2205 })
2206 .collect();
2207
2208 for h in handles {
2209 h.join().expect("worker panicked");
2210 }
2211
2212 // Every write must have landed exactly once — count rows by
2213 // probing the table directly so we don't depend on a SELECT
2214 // COUNT(*) implementation.
2215 let db = primary.database();
2216 let table = db.get_table("log".to_string()).unwrap();
2217 assert_eq!(
2218 table.rowids().len(),
2219 THREADS * PER_THREAD,
2220 "expected every threaded INSERT to commit",
2221 );
2222 }
2223
2224 /// `connect()` over a file-backed database produces sibling
2225 /// handles that hit the same on-disk pager. Auto-save through one
2226 /// must be visible through the other without a re-open.
2227 #[test]
2228 fn connect_shares_file_backed_database() {
2229 let path = tmp_path("connect_file");
2230 let mut primary = Connection::open(&path).unwrap();
2231 primary
2232 .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);")
2233 .unwrap();
2234
2235 let mut sibling = primary.connect();
2236 sibling.execute("INSERT INTO t (v) VALUES ('hi');").unwrap();
2237
2238 let stmt = primary.prepare("SELECT v FROM t;").unwrap();
2239 let rows = stmt.query().unwrap().collect_all().unwrap();
2240 assert_eq!(rows.len(), 1);
2241 assert_eq!(rows[0].get::<String>(0).unwrap(), "hi");
2242
2243 drop(sibling);
2244 drop(primary);
2245 cleanup(&path);
2246 }
2247
2248 /// Prepared-statement caches are per-handle, by design — sharing
2249 /// a mutable LRU across threads would require an extra lock for
2250 /// no real win (each worker prepares its own hot SQL).
2251 #[test]
2252 fn prep_cache_is_per_handle() {
2253 let mut a = Connection::open_in_memory().unwrap();
2254 a.execute("CREATE TABLE t (a INTEGER);").unwrap();
2255 let mut b = a.connect();
2256
2257 let _ = a.prepare_cached("SELECT a FROM t").unwrap();
2258 let _ = a.prepare_cached("SELECT a FROM t").unwrap();
2259 assert_eq!(a.prepared_cache_len(), 1);
2260 // The sibling's cache is untouched.
2261 assert_eq!(b.prepared_cache_len(), 0);
2262 let _ = b.prepare_cached("SELECT a FROM t").unwrap();
2263 assert_eq!(b.prepared_cache_len(), 1);
2264 }
2265
2266 /// Static check: `Connection` is `Send + Sync`. Required so it can
2267 /// be moved across threads (or wrapped in `Arc`) without a typestate
2268 /// adapter — the headline contract Phase 10.1 puts in place.
2269 #[test]
2270 fn connection_is_send_and_sync() {
2271 fn assert_send<T: Send>() {}
2272 fn assert_sync<T: Sync>() {}
2273 assert_send::<Connection>();
2274 assert_sync::<Connection>();
2275 }
2276
2277 // -----------------------------------------------------------------
2278 // Phase 11.3 — `PRAGMA journal_mode` round-trip
2279 // -----------------------------------------------------------------
2280
2281 /// Fresh connections default to `wal` mode. The PRAGMA read form
2282 /// renders the current value as a single-row, single-column table
2283 /// the REPL can print.
2284 #[test]
2285 fn journal_mode_defaults_to_wal_and_renders_through_pragma() {
2286 let mut conn = Connection::open_in_memory().unwrap();
2287 assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2288
2289 // Read form returns "1 row returned." status (matching
2290 // `auto_vacuum`'s shape).
2291 let status = conn.execute("PRAGMA journal_mode;").unwrap();
2292 assert!(
2293 status.contains("1 row returned"),
2294 "unexpected status: {status}"
2295 );
2296 }
2297
2298 /// `PRAGMA journal_mode = mvcc;` flips the per-database mode and
2299 /// is observable through every sibling handle. The headline
2300 /// per-database contract for Phase 11.3.
2301 #[test]
2302 fn journal_mode_set_to_mvcc_propagates_to_siblings() {
2303 let mut primary = Connection::open_in_memory().unwrap();
2304 let sibling = primary.connect();
2305 assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal);
2306
2307 primary.execute("PRAGMA journal_mode = mvcc;").unwrap();
2308 assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2309 // Sibling sees the same value — proves the setting lives on
2310 // the shared `Database`, not on the per-handle Connection.
2311 assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2312
2313 // Switch back is allowed because no MVCC versions exist yet
2314 // (11.4 will populate the store).
2315 primary.execute("PRAGMA journal_mode = wal;").unwrap();
2316 assert_eq!(primary.journal_mode(), crate::mvcc::JournalMode::Wal);
2317 assert_eq!(sibling.journal_mode(), crate::mvcc::JournalMode::Wal);
2318 }
2319
2320 /// The set form is case-insensitive on both the pragma name and
2321 /// the value (matching SQLite). Quoted values work too.
2322 #[test]
2323 fn journal_mode_pragma_is_case_insensitive() {
2324 let mut conn = Connection::open_in_memory().unwrap();
2325 conn.execute("PRAGMA JOURNAL_MODE = MVCC;").unwrap();
2326 assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Mvcc);
2327 conn.execute("pragma journal_mode = 'wal';").unwrap();
2328 assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2329 }
2330
2331 /// Unknown modes return a typed error and don't disturb the
2332 /// existing setting.
2333 #[test]
2334 fn journal_mode_rejects_unknown_value() {
2335 let mut conn = Connection::open_in_memory().unwrap();
2336 let err = conn
2337 .execute("PRAGMA journal_mode = delete;")
2338 .expect_err("unknown mode must error");
2339 let msg = format!("{err}");
2340 assert!(
2341 msg.contains("unknown mode 'delete'"),
2342 "unexpected error: {msg}"
2343 );
2344 // Setting wasn't disturbed.
2345 assert_eq!(conn.journal_mode(), crate::mvcc::JournalMode::Wal);
2346 }
2347
2348 /// Numeric values are rejected — `journal_mode` is enum-shaped.
2349 /// SQLite accepts e.g. `journal_mode = 0` for OFF historically;
2350 /// SQLRite stays explicit.
2351 #[test]
2352 fn journal_mode_rejects_numeric_value() {
2353 let mut conn = Connection::open_in_memory().unwrap();
2354 let err = conn
2355 .execute("PRAGMA journal_mode = 0;")
2356 .expect_err("numeric mode must error");
2357 let msg = format!("{err}");
2358 assert!(msg.contains("numeric"), "unexpected error: {msg}");
2359 }
2360
2361 // -----------------------------------------------------------------
2362 // Phase 11.4 — `BEGIN CONCURRENT` end-to-end
2363 // -----------------------------------------------------------------
2364
2365 /// `BEGIN CONCURRENT` requires `PRAGMA journal_mode = mvcc;`
2366 /// first. v0 doesn't auto-enable MVCC mode; users opt in
2367 /// explicitly so the implications (in-memory MvStore growth,
2368 /// `Busy` errors becoming possible) aren't a surprise.
2369 #[test]
2370 fn begin_concurrent_requires_mvcc_journal_mode() {
2371 let mut conn = Connection::open_in_memory().unwrap();
2372 let err = conn
2373 .execute("BEGIN CONCURRENT;")
2374 .expect_err("must require MVCC journal mode");
2375 let msg = format!("{err}");
2376 assert!(
2377 msg.contains("PRAGMA journal_mode = mvcc"),
2378 "unexpected error: {msg}"
2379 );
2380 }
2381
2382 /// Round-trip: enable MVCC, BEGIN CONCURRENT, no writes,
2383 /// COMMIT. The simplest control-flow check — proves the
2384 /// parser-intent + lifecycle hooks all line up.
2385 #[test]
2386 fn begin_concurrent_then_empty_commit_round_trips() {
2387 let mut conn = Connection::open_in_memory().unwrap();
2388 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2389 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2390 .unwrap();
2391 let begin_status = conn.execute("BEGIN CONCURRENT;").unwrap();
2392 assert_eq!(begin_status, "BEGIN");
2393 let commit_status = conn.execute("COMMIT;").unwrap();
2394 assert_eq!(commit_status, "COMMIT");
2395 }
2396
2397 /// Plan test #1: two concurrent transactions on **disjoint
2398 /// rowids** must both commit. No write-write conflict to
2399 /// detect; validation passes for both.
2400 #[test]
2401 fn two_concurrent_inserts_on_disjoint_rows_both_commit() {
2402 let mut a = Connection::open_in_memory().unwrap();
2403 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2404 a.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER);")
2405 .unwrap();
2406 let mut b = a.connect();
2407
2408 a.execute("BEGIN CONCURRENT;").unwrap();
2409 a.execute("INSERT INTO accounts (id, balance) VALUES (1, 100);")
2410 .unwrap();
2411
2412 b.execute("BEGIN CONCURRENT;").unwrap();
2413 b.execute("INSERT INTO accounts (id, balance) VALUES (2, 200);")
2414 .unwrap();
2415
2416 // Both commit cleanly — disjoint rowids, no conflict.
2417 a.execute("COMMIT;").unwrap();
2418 b.execute("COMMIT;").unwrap();
2419
2420 // Both rows are visible through the legacy read path.
2421 let stmt = a.prepare("SELECT id, balance FROM accounts;").unwrap();
2422 let mut rows: Vec<(i64, i64)> = stmt
2423 .query()
2424 .unwrap()
2425 .collect_all()
2426 .unwrap()
2427 .into_iter()
2428 .map(|r| (r.get::<i64>(0).unwrap(), r.get::<i64>(1).unwrap()))
2429 .collect();
2430 rows.sort();
2431 assert_eq!(rows, vec![(1, 100), (2, 200)]);
2432 }
2433
2434 /// Plan test #2: two concurrent transactions on the **same
2435 /// row** — one commits, the other aborts with `Busy`.
2436 #[test]
2437 fn two_concurrent_updates_same_row_one_aborts_with_busy() {
2438 let mut a = Connection::open_in_memory().unwrap();
2439 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2440 a.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER);")
2441 .unwrap();
2442 a.execute("INSERT INTO accounts (id, balance) VALUES (1, 100);")
2443 .unwrap();
2444 let mut b = a.connect();
2445
2446 // Both BEGIN before either UPDATE — that's the snapshot
2447 // the validation checks against.
2448 a.execute("BEGIN CONCURRENT;").unwrap();
2449 b.execute("BEGIN CONCURRENT;").unwrap();
2450
2451 a.execute("UPDATE accounts SET balance = 200 WHERE id = 1;")
2452 .unwrap();
2453 b.execute("UPDATE accounts SET balance = 300 WHERE id = 1;")
2454 .unwrap();
2455
2456 // First commit wins.
2457 a.execute("COMMIT;").unwrap();
2458
2459 // Second commit hits the validation pass and aborts.
2460 let err = b
2461 .execute("COMMIT;")
2462 .expect_err("second commit must abort with Busy");
2463 assert!(matches!(err, SQLRiteError::Busy(_)));
2464 assert!(err.is_retryable(), "Busy must be retryable");
2465 let msg = format!("{err}");
2466 assert!(
2467 msg.contains("write-write conflict"),
2468 "unexpected error: {msg}"
2469 );
2470
2471 // The winning value is what's persisted.
2472 let stmt = a
2473 .prepare("SELECT balance FROM accounts WHERE id = 1;")
2474 .unwrap();
2475 let rows = stmt.query().unwrap().collect_all().unwrap();
2476 assert_eq!(rows.len(), 1);
2477 assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2478 }
2479
2480 /// Plan test #3: an aborted transaction's writes must never
2481 /// become visible. After ROLLBACK (explicit or implicit on
2482 /// Busy), the row keeps its pre-tx value.
2483 #[test]
2484 fn aborted_transactions_writes_never_become_visible() {
2485 let mut conn = Connection::open_in_memory().unwrap();
2486 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2487 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2488 .unwrap();
2489 conn.execute("INSERT INTO t (id, v) VALUES (1, 100);")
2490 .unwrap();
2491
2492 // Explicit ROLLBACK.
2493 conn.execute("BEGIN CONCURRENT;").unwrap();
2494 conn.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2495 conn.execute("ROLLBACK;").unwrap();
2496
2497 let stmt = conn.prepare("SELECT v FROM t WHERE id = 1;").unwrap();
2498 let rows = stmt.query().unwrap().collect_all().unwrap();
2499 assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2500
2501 // Implicit rollback via Busy: another connection commits a
2502 // newer version under us.
2503 let mut other = conn.connect();
2504 conn.execute("BEGIN CONCURRENT;").unwrap();
2505 other.execute("BEGIN CONCURRENT;").unwrap();
2506 conn.execute("UPDATE t SET v = 7 WHERE id = 1;").unwrap();
2507 other.execute("UPDATE t SET v = 13 WHERE id = 1;").unwrap();
2508 conn.execute("COMMIT;").unwrap();
2509 let _ = other.execute("COMMIT;").expect_err("must abort with Busy");
2510
2511 // The losing writer's value (13) never lands. The winner
2512 // (7) is what's visible.
2513 let rows = conn
2514 .prepare("SELECT v FROM t WHERE id = 1;")
2515 .unwrap()
2516 .query()
2517 .unwrap()
2518 .collect_all()
2519 .unwrap();
2520 assert_eq!(rows[0].get::<i64>(0).unwrap(), 7);
2521 }
2522
2523 /// Plan test #4: retry-after-`Busy` succeeds. The caller's
2524 /// retry helper opens a fresh `BEGIN CONCURRENT` (with a
2525 /// new `begin_ts` past the conflict) and the same UPDATE
2526 /// commits cleanly.
2527 #[test]
2528 fn retry_after_busy_succeeds() {
2529 let mut a = Connection::open_in_memory().unwrap();
2530 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2531 a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2532 .unwrap();
2533 a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2534 let mut b = a.connect();
2535
2536 a.execute("BEGIN CONCURRENT;").unwrap();
2537 b.execute("BEGIN CONCURRENT;").unwrap();
2538 a.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
2539 b.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2540 a.execute("COMMIT;").unwrap();
2541 let err = b.execute("COMMIT;").expect_err("first attempt must Busy");
2542 assert!(err.is_retryable());
2543
2544 // Retry: open a fresh tx, redo the same UPDATE, commit.
2545 b.execute("BEGIN CONCURRENT;").unwrap();
2546 b.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2547 b.execute("COMMIT;").expect("retry must succeed");
2548
2549 let rows = a
2550 .prepare("SELECT v FROM t WHERE id = 1;")
2551 .unwrap()
2552 .query()
2553 .unwrap()
2554 .collect_all()
2555 .unwrap();
2556 assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2557 }
2558
2559 /// Nested `BEGIN CONCURRENT` is rejected with a typed error.
2560 /// Same single-tx-per-connection rule the legacy `BEGIN`
2561 /// already enforces.
2562 #[test]
2563 fn nested_begin_concurrent_is_rejected() {
2564 let mut conn = Connection::open_in_memory().unwrap();
2565 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2566 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2567 .unwrap();
2568 conn.execute("BEGIN CONCURRENT;").unwrap();
2569 let err = conn
2570 .execute("BEGIN CONCURRENT;")
2571 .expect_err("nested BEGIN CONCURRENT must error");
2572 assert!(format!("{err}").contains("already open"));
2573 }
2574
2575 /// Legacy `BEGIN` inside `BEGIN CONCURRENT` is rejected.
2576 /// Mixing the two transaction kinds isn't supported in v0;
2577 /// the deep-clone snapshot and the MVCC write-set don't
2578 /// interleave cleanly.
2579 #[test]
2580 fn legacy_begin_inside_concurrent_is_rejected() {
2581 let mut conn = Connection::open_in_memory().unwrap();
2582 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2583 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2584 .unwrap();
2585 conn.execute("BEGIN CONCURRENT;").unwrap();
2586 let err = conn
2587 .execute("BEGIN;")
2588 .expect_err("legacy BEGIN inside concurrent tx must error");
2589 assert!(format!("{err}").contains("concurrent transaction is already open"));
2590 }
2591
2592 /// DDL inside `BEGIN CONCURRENT` is rejected with a typed
2593 /// error. Plan §8 calls this out as an explicit non-goal —
2594 /// schema mutations interact poorly with the snapshot-
2595 /// based commit and the v0 write-set model.
2596 #[test]
2597 fn ddl_inside_begin_concurrent_is_rejected() {
2598 let mut conn = Connection::open_in_memory().unwrap();
2599 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2600 conn.execute("BEGIN CONCURRENT;").unwrap();
2601 let err = conn
2602 .execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
2603 .expect_err("DDL inside concurrent tx must error");
2604 let msg = format!("{err}");
2605 assert!(msg.contains("DDL is not supported"), "unexpected: {msg}");
2606 // The transaction stays open — caller can ROLLBACK.
2607 conn.execute("ROLLBACK;").unwrap();
2608 }
2609
2610 /// An empty concurrent commit (BEGIN, no writes, COMMIT)
2611 /// always succeeds — even when other transactions have
2612 /// committed in the meantime, because we have nothing to
2613 /// validate.
2614 #[test]
2615 fn empty_concurrent_commit_never_busies() {
2616 let mut a = Connection::open_in_memory().unwrap();
2617 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2618 a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2619 .unwrap();
2620 a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2621 let mut b = a.connect();
2622
2623 a.execute("BEGIN CONCURRENT;").unwrap();
2624 // Sibling B opens its own concurrent tx and commits a
2625 // change to row 1.
2626 b.execute("BEGIN CONCURRENT;").unwrap();
2627 b.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2628 b.execute("COMMIT;").unwrap();
2629
2630 // a never wrote anything — its commit is purely a
2631 // tx-state cleanup. Validation has no rows to check.
2632 a.execute("COMMIT;")
2633 .expect("empty commit must succeed even if siblings committed");
2634 }
2635
2636 // -----------------------------------------------------------------
2637 // Phase 11.5 — snapshot-isolated reads via Statement::query
2638 // -----------------------------------------------------------------
2639
2640 /// The headline 11.5 contract: a SELECT issued via
2641 /// `prepare(...).query()` inside an open `BEGIN CONCURRENT`
2642 /// sees the BEGIN-time snapshot, not the post-commit live
2643 /// state. Phase 11.4 had this test failing because the
2644 /// prepare/query path bypassed the swap; Phase 11.5 routes
2645 /// it through `with_snapshot_read`.
2646 #[test]
2647 fn query_inside_concurrent_tx_sees_begin_time_snapshot() {
2648 let mut a = Connection::open_in_memory().unwrap();
2649 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2650 a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2651 .unwrap();
2652 a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2653 let mut b = a.connect();
2654
2655 a.execute("BEGIN CONCURRENT;").unwrap();
2656 // Sibling B commits a change to row 1 from another tx.
2657 b.execute("BEGIN CONCURRENT;").unwrap();
2658 b.execute("UPDATE t SET v = 999 WHERE id = 1;").unwrap();
2659 b.execute("COMMIT;").unwrap();
2660
2661 // Reader inside a's tx, via prepare()+query(), must see
2662 // the BEGIN-time value (1), not b's committed value (999).
2663 let rows = a
2664 .prepare("SELECT v FROM t WHERE id = 1;")
2665 .unwrap()
2666 .query()
2667 .unwrap()
2668 .collect_all()
2669 .unwrap();
2670 assert_eq!(
2671 rows[0].get::<i64>(0).unwrap(),
2672 1,
2673 "Statement::query inside BEGIN CONCURRENT must see the snapshot, not the live db"
2674 );
2675
2676 // After a's empty commit, the same handle's read sees b's
2677 // value (999) — the swap is gone, the legacy read path is
2678 // back in play.
2679 a.execute("COMMIT;").unwrap();
2680 let rows = a
2681 .prepare("SELECT v FROM t WHERE id = 1;")
2682 .unwrap()
2683 .query()
2684 .unwrap()
2685 .collect_all()
2686 .unwrap();
2687 assert_eq!(rows[0].get::<i64>(0).unwrap(), 999);
2688 }
2689
2690 /// Read-your-writes: an UPDATE inside the tx is visible to
2691 /// the same tx's subsequent SELECT via `query()`. The swap
2692 /// makes the tx's private clone the read target, so writes
2693 /// the executor staged on the clone are reflected.
2694 #[test]
2695 fn query_inside_concurrent_tx_sees_own_writes() {
2696 let mut conn = Connection::open_in_memory().unwrap();
2697 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2698 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2699 .unwrap();
2700 conn.execute("INSERT INTO t (id, v) VALUES (1, 100);")
2701 .unwrap();
2702
2703 conn.execute("BEGIN CONCURRENT;").unwrap();
2704 conn.execute("UPDATE t SET v = 200 WHERE id = 1;").unwrap();
2705 // Inside the tx, query() sees v = 200 (our own write).
2706 let rows = conn
2707 .prepare("SELECT v FROM t WHERE id = 1;")
2708 .unwrap()
2709 .query()
2710 .unwrap()
2711 .collect_all()
2712 .unwrap();
2713 assert_eq!(rows[0].get::<i64>(0).unwrap(), 200);
2714
2715 // After ROLLBACK, the live db still has 100 (the write
2716 // never landed).
2717 conn.execute("ROLLBACK;").unwrap();
2718 let rows = conn
2719 .prepare("SELECT v FROM t WHERE id = 1;")
2720 .unwrap()
2721 .query()
2722 .unwrap()
2723 .collect_all()
2724 .unwrap();
2725 assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2726 }
2727
2728 /// Reads via `query_with_params` (parameter-bound SELECT)
2729 /// also flow through the snapshot. Same path, just with the
2730 /// substitution step in front.
2731 #[test]
2732 fn query_with_params_inside_concurrent_tx_sees_snapshot() {
2733 let mut a = Connection::open_in_memory().unwrap();
2734 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2735 a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2736 .unwrap();
2737 a.execute("INSERT INTO t (id, v) VALUES (1, 7);").unwrap();
2738 let mut b = a.connect();
2739
2740 a.execute("BEGIN CONCURRENT;").unwrap();
2741 b.execute("BEGIN CONCURRENT;").unwrap();
2742 b.execute("UPDATE t SET v = 42 WHERE id = 1;").unwrap();
2743 b.execute("COMMIT;").unwrap();
2744
2745 let rows = a
2746 .prepare("SELECT v FROM t WHERE id = ?")
2747 .unwrap()
2748 .query_with_params(&[Value::Integer(1)])
2749 .unwrap()
2750 .collect_all()
2751 .unwrap();
2752 assert_eq!(rows[0].get::<i64>(0).unwrap(), 7);
2753
2754 a.execute("COMMIT;").unwrap();
2755 }
2756
2757 /// Outside any concurrent tx, `query()` reads the live
2758 /// database. Sanity check that 11.5's snapshot routing is
2759 /// strictly opt-in via `BEGIN CONCURRENT`.
2760 #[test]
2761 fn query_outside_concurrent_tx_sees_live_database() {
2762 let mut a = Connection::open_in_memory().unwrap();
2763 a.execute("PRAGMA journal_mode = mvcc;").unwrap();
2764 a.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2765 .unwrap();
2766 a.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
2767 let mut b = a.connect();
2768
2769 // Sibling commits a change. a is NOT in a tx, so its read
2770 // should see the post-commit value.
2771 b.execute("BEGIN CONCURRENT;").unwrap();
2772 b.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
2773 b.execute("COMMIT;").unwrap();
2774
2775 let rows = a
2776 .prepare("SELECT v FROM t WHERE id = 1;")
2777 .unwrap()
2778 .query()
2779 .unwrap()
2780 .collect_all()
2781 .unwrap();
2782 assert_eq!(rows[0].get::<i64>(0).unwrap(), 100);
2783 }
2784
2785 /// Sibling reader at the moment a writer commits: the
2786 /// reader's own `BEGIN CONCURRENT` (and its private snapshot)
2787 /// must isolate it from the writer's commit, so the snapshot
2788 /// stays internally consistent for the reader's lifetime.
2789 /// Repeats the read multiple times across the writer's
2790 /// activity to catch any races where the snapshot leaks.
2791 #[test]
2792 fn snapshot_stays_consistent_across_sibling_commits() {
2793 let mut reader = Connection::open_in_memory().unwrap();
2794 reader.execute("PRAGMA journal_mode = mvcc;").unwrap();
2795 reader
2796 .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2797 .unwrap();
2798 reader
2799 .execute("INSERT INTO t (id, v) VALUES (1, 1);")
2800 .unwrap();
2801 let mut writer = reader.connect();
2802
2803 reader.execute("BEGIN CONCURRENT;").unwrap();
2804 // First read inside reader's tx — sees v=1.
2805 let read_at_t0 = reader
2806 .prepare("SELECT v FROM t WHERE id = 1;")
2807 .unwrap()
2808 .query()
2809 .unwrap()
2810 .collect_all()
2811 .unwrap();
2812 assert_eq!(read_at_t0[0].get::<i64>(0).unwrap(), 1);
2813
2814 // Writer commits a stream of changes between reader's
2815 // reads. Each commit advances the live db and adds a
2816 // version to MvStore.
2817 for new_value in [10, 20, 30, 40] {
2818 writer.execute("BEGIN CONCURRENT;").unwrap();
2819 writer
2820 .execute(&format!("UPDATE t SET v = {new_value} WHERE id = 1;"))
2821 .unwrap();
2822 writer.execute("COMMIT;").unwrap();
2823
2824 // Reader's snapshot must still see v=1.
2825 let r = reader
2826 .prepare("SELECT v FROM t WHERE id = 1;")
2827 .unwrap()
2828 .query()
2829 .unwrap()
2830 .collect_all()
2831 .unwrap();
2832 assert_eq!(
2833 r[0].get::<i64>(0).unwrap(),
2834 1,
2835 "snapshot regressed after writer committed v={new_value}",
2836 );
2837 }
2838
2839 reader.execute("COMMIT;").unwrap();
2840 }
2841
2842 // -----------------------------------------------------------------
2843 // Phase 11.6 — MVCC garbage collection
2844 // -----------------------------------------------------------------
2845
2846 /// Per-commit GC bounds the chain length under repeated
2847 /// updates to the same row when no readers are holding a
2848 /// snapshot that would need older versions. After many
2849 /// updates the store should hold roughly one version per row,
2850 /// not a version per commit.
2851 #[test]
2852 fn repeated_updates_keep_chain_bounded_when_no_readers() {
2853 let mut conn = Connection::open_in_memory().unwrap();
2854 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2855 conn.execute("CREATE TABLE counters (id INTEGER PRIMARY KEY, n INTEGER);")
2856 .unwrap();
2857 conn.execute("INSERT INTO counters (id, n) VALUES (1, 0);")
2858 .unwrap();
2859
2860 // 50 sequential updates inside their own concurrent
2861 // transactions. With no overlapping readers, the
2862 // per-commit GC sweep should reclaim every superseded
2863 // version and leave only the latest.
2864 for n in 1..=50 {
2865 conn.execute("BEGIN CONCURRENT;").unwrap();
2866 conn.execute(&format!("UPDATE counters SET n = {n} WHERE id = 1;"))
2867 .unwrap();
2868 conn.execute("COMMIT;").unwrap();
2869 }
2870
2871 // MvStore should now hold exactly one version for the
2872 // row we hammered (the latest). Without GC it would hold
2873 // 50.
2874 let db = conn.database();
2875 let store_size = db.mv_store().total_versions();
2876 let tracked = db.mv_store().tracked_rows();
2877 drop(db);
2878 assert_eq!(
2879 store_size, 1,
2880 "expected 1 version after 50 GC'd updates, got {store_size}",
2881 );
2882 assert_eq!(tracked, 1);
2883 }
2884
2885 /// GC must NOT reclaim versions that an in-flight reader's
2886 /// snapshot might still see. While a reader holds an open
2887 /// `BEGIN CONCURRENT` at `begin_ts = T`, every version with
2888 /// `end > T` must remain in the chain.
2889 #[test]
2890 fn gc_preserves_versions_visible_to_active_reader() {
2891 let mut writer = Connection::open_in_memory().unwrap();
2892 writer.execute("PRAGMA journal_mode = mvcc;").unwrap();
2893 writer
2894 .execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2895 .unwrap();
2896 writer
2897 .execute("INSERT INTO t (id, v) VALUES (1, 0);")
2898 .unwrap();
2899 let mut reader = writer.connect();
2900
2901 // Reader opens its tx FIRST so its snapshot sits at the
2902 // smallest `begin_ts` across the active set.
2903 reader.execute("BEGIN CONCURRENT;").unwrap();
2904
2905 // Writer commits five updates; per-commit GC fires after
2906 // each, but the reader's begin_ts pins the watermark so
2907 // the older versions can't be reclaimed.
2908 for n in 1..=5 {
2909 writer.execute("BEGIN CONCURRENT;").unwrap();
2910 writer
2911 .execute(&format!("UPDATE t SET v = {n} WHERE id = 1;"))
2912 .unwrap();
2913 writer.execute("COMMIT;").unwrap();
2914 }
2915
2916 // Reader's snapshot still sees v=0 — the chain must have
2917 // retained the original version (or a tombstone-capped
2918 // earlier value) so the visibility rule resolves it.
2919 let rows = reader
2920 .prepare("SELECT v FROM t WHERE id = 1;")
2921 .unwrap()
2922 .query()
2923 .unwrap()
2924 .collect_all()
2925 .unwrap();
2926 assert_eq!(rows[0].get::<i64>(0).unwrap(), 0);
2927
2928 // The reader's snapshot is preserved by GC's watermark.
2929 // No assertion on the exact chain length — that's an
2930 // implementation detail; the property is "reader sees
2931 // v=0 even after writer's burst."
2932
2933 reader.execute("COMMIT;").unwrap();
2934
2935 // After the reader closes, the watermark jumps and an
2936 // explicit vacuum reclaims everything reclaimable.
2937 // (We skip checking the exact reclaim count because the
2938 // post-reader-close state of the chain depends on the
2939 // ordering of the reader's `drop` and the watermark
2940 // sample inside `vacuum_mvcc` — both are correct, just
2941 // different.)
2942 writer.vacuum_mvcc();
2943 let db = writer.database();
2944 let store_size = db.mv_store().total_versions();
2945 drop(db);
2946 // At most one version per row (the latest committed).
2947 assert!(
2948 store_size <= 1,
2949 "after reader closed and vacuum ran, expected ≤1 version, got {store_size}",
2950 );
2951 }
2952
2953 /// `Connection::vacuum_mvcc` is a no-op on a fresh
2954 /// `JournalMode::Wal` database: the store is empty, nothing
2955 /// to reclaim. Matches the "safe to call regardless of
2956 /// journal mode" contract.
2957 #[test]
2958 fn vacuum_mvcc_is_a_noop_on_wal_database() {
2959 let conn = Connection::open_in_memory().unwrap();
2960 // Default journal mode is Wal; never enabled MVCC.
2961 assert_eq!(conn.vacuum_mvcc(), 0);
2962 }
2963
2964 /// Explicit `vacuum_mvcc` reclaims everything reclaimable
2965 /// when no transactions are active.
2966 #[test]
2967 fn vacuum_mvcc_reclaims_everything_with_no_active_readers() {
2968 let mut conn = Connection::open_in_memory().unwrap();
2969 conn.execute("PRAGMA journal_mode = mvcc;").unwrap();
2970 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
2971 .unwrap();
2972
2973 // Build up some versions.
2974 conn.execute("INSERT INTO t (id, v) VALUES (1, 0);")
2975 .unwrap();
2976 conn.execute("BEGIN CONCURRENT;").unwrap();
2977 conn.execute("UPDATE t SET v = 1 WHERE id = 1;").unwrap();
2978 conn.execute("COMMIT;").unwrap();
2979 conn.execute("BEGIN CONCURRENT;").unwrap();
2980 conn.execute("UPDATE t SET v = 2 WHERE id = 1;").unwrap();
2981 conn.execute("COMMIT;").unwrap();
2982
2983 // Per-commit GC has already done most of the work; the
2984 // explicit vacuum is idempotent.
2985 let _ = conn.vacuum_mvcc();
2986 let db = conn.database();
2987 let store_size = db.mv_store().total_versions();
2988 drop(db);
2989 assert!(store_size <= 1);
2990 }
2991
2992 /// `is_retryable()` covers both `Busy` and `BusySnapshot`
2993 /// without callers having to match each variant. The contract
2994 /// SDK retry helpers will rely on.
2995 #[test]
2996 fn is_retryable_covers_busy_variants() {
2997 assert!(SQLRiteError::Busy("x".into()).is_retryable());
2998 assert!(SQLRiteError::BusySnapshot("x".into()).is_retryable());
2999 assert!(!SQLRiteError::General("x".into()).is_retryable());
3000 }
3001
3002 /// Phase 11.9 — every BEGIN CONCURRENT commit on a file-backed
3003 /// database leaves an MVCC log-record frame in the WAL. The Pager
3004 /// surfaces those on reopen via `recovered_mvcc_commits`.
3005 #[test]
3006 fn mvcc_commit_persists_a_log_record_into_wal() {
3007 let path = tmp_path("mvcc_log_record");
3008 {
3009 let mut c = Connection::open(&path).unwrap();
3010 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3011 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3012 .unwrap();
3013 c.execute("BEGIN CONCURRENT;").unwrap();
3014 c.execute("INSERT INTO t (id, v) VALUES (1, 42);").unwrap();
3015 c.execute("COMMIT;").unwrap();
3016 }
3017 // Reopen and confirm the WAL replay surfaced the batch.
3018 let c2 = Connection::open(&path).unwrap();
3019 let db = c2.database();
3020 let pager = db.pager.as_ref().expect("file-backed db carries a pager");
3021 let batches = pager.recovered_mvcc_commits();
3022 assert_eq!(batches.len(), 1, "one BEGIN CONCURRENT commit -> one batch");
3023 assert_eq!(batches[0].records.len(), 1, "one row written");
3024 let rec = &batches[0].records[0];
3025 assert_eq!(rec.row.table, "t");
3026 assert_eq!(rec.row.rowid, 1);
3027 match &rec.payload {
3028 VersionPayload::Present(cols) => {
3029 assert!(cols.iter().any(
3030 |(k, v)| k == "v" && matches!(v, crate::sql::db::table::Value::Integer(42))
3031 ));
3032 }
3033 other => panic!("unexpected payload: {other:?}"),
3034 }
3035 drop(db);
3036 drop(c2);
3037 cleanup(&path);
3038 }
3039
3040 /// Phase 11.9 — on reopen the MVCC log records are pushed back
3041 /// into `MvStore`. The conflict-detection window survives a
3042 /// process restart: a write whose `begin_ts` predates a
3043 /// replayed commit must surface as `Busy`.
3044 #[test]
3045 fn mvcc_reopen_restores_mv_store_and_clock() {
3046 let path = tmp_path("mvcc_reopen");
3047 {
3048 let mut c = Connection::open(&path).unwrap();
3049 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3050 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3051 .unwrap();
3052 c.execute("BEGIN CONCURRENT;").unwrap();
3053 c.execute("INSERT INTO t (id, v) VALUES (1, 10);").unwrap();
3054 c.execute("COMMIT;").unwrap();
3055 c.execute("BEGIN CONCURRENT;").unwrap();
3056 c.execute("UPDATE t SET v = 20 WHERE id = 1;").unwrap();
3057 c.execute("COMMIT;").unwrap();
3058 }
3059 let c2 = Connection::open(&path).unwrap();
3060 let db = c2.database();
3061 // Two commits replayed → two versions for row t/1 (the
3062 // first capped, the second open-ended).
3063 let store = db.mv_store();
3064 let row = RowID::new("t", 1);
3065 assert!(
3066 store.latest_committed_begin(&row).is_some(),
3067 "MvStore should know about row t/1 after reopen"
3068 );
3069 // Clock must have advanced past the persisted commits so
3070 // any new transaction gets a fresh `begin_ts`.
3071 let last_commit_ts = store.latest_committed_begin(&row).unwrap();
3072 assert!(
3073 db.mvcc_clock().now() >= last_commit_ts,
3074 "clock {} must be >= last replayed commit_ts {}",
3075 db.mvcc_clock().now(),
3076 last_commit_ts,
3077 );
3078 drop(db);
3079 drop(c2);
3080 cleanup(&path);
3081 }
3082
3083 /// Phase 11.9 — multi-row batches survive replay intact, with
3084 /// every (RowID, payload) pair coming back from the WAL.
3085 #[test]
3086 fn mvcc_multi_row_batch_replays_intact() {
3087 let path = tmp_path("mvcc_multi_row");
3088 {
3089 let mut c = Connection::open(&path).unwrap();
3090 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3091 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3092 .unwrap();
3093 // Seed rows under legacy mode so the concurrent tx
3094 // can UPDATE them — Phase 11 keeps INSERT-only
3095 // semantics for the concurrent path simple.
3096 c.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap();
3097 c.execute("INSERT INTO t (id, v) VALUES (2, 2);").unwrap();
3098 c.execute("INSERT INTO t (id, v) VALUES (3, 3);").unwrap();
3099
3100 c.execute("BEGIN CONCURRENT;").unwrap();
3101 c.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap();
3102 c.execute("UPDATE t SET v = 200 WHERE id = 2;").unwrap();
3103 c.execute("UPDATE t SET v = 300 WHERE id = 3;").unwrap();
3104 c.execute("COMMIT;").unwrap();
3105 }
3106 let c2 = Connection::open(&path).unwrap();
3107 let db = c2.database();
3108 let pager = db.pager.as_ref().unwrap();
3109 let batches = pager.recovered_mvcc_commits();
3110 assert_eq!(batches.len(), 1, "single COMMIT -> single batch");
3111 let rowids: Vec<i64> = batches[0].records.iter().map(|r| r.row.rowid).collect();
3112 assert!(rowids.contains(&1));
3113 assert!(rowids.contains(&2));
3114 assert!(rowids.contains(&3));
3115 assert_eq!(batches[0].records.len(), 3);
3116 drop(db);
3117 drop(c2);
3118 cleanup(&path);
3119 }
3120
3121 /// Phase 11.9 — a BEGIN CONCURRENT that's never committed
3122 /// leaves no MVCC frame in the WAL. The reopen path replays
3123 /// only what was sealed.
3124 #[test]
3125 fn mvcc_rolled_back_tx_leaves_no_wal_record() {
3126 let path = tmp_path("mvcc_rollback");
3127 {
3128 let mut c = Connection::open(&path).unwrap();
3129 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3130 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3131 .unwrap();
3132 c.execute("BEGIN CONCURRENT;").unwrap();
3133 c.execute("INSERT INTO t (id, v) VALUES (1, 999);").unwrap();
3134 c.execute("ROLLBACK;").unwrap();
3135 }
3136 let c2 = Connection::open(&path).unwrap();
3137 let db = c2.database();
3138 let pager = db.pager.as_ref().unwrap();
3139 assert!(
3140 pager.recovered_mvcc_commits().is_empty(),
3141 "ROLLBACK must not append MVCC frames"
3142 );
3143 // Legacy tables also untouched.
3144 let store = db.mv_store();
3145 assert_eq!(store.total_versions(), 0);
3146 drop(db);
3147 drop(c2);
3148 cleanup(&path);
3149 }
3150
3151 /// Phase 11.9 — legacy (non-BEGIN-CONCURRENT) commits do
3152 /// **not** emit MVCC frames. The persistence is opt-in along
3153 /// the same axis as `BEGIN CONCURRENT`.
3154 #[test]
3155 fn legacy_commit_does_not_emit_mvcc_frame() {
3156 let path = tmp_path("mvcc_legacy_no_frame");
3157 {
3158 let mut c = Connection::open(&path).unwrap();
3159 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3160 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);")
3161 .unwrap();
3162 c.execute("INSERT INTO t (id) VALUES (1);").unwrap();
3163 }
3164 let c2 = Connection::open(&path).unwrap();
3165 let db = c2.database();
3166 let pager = db.pager.as_ref().unwrap();
3167 assert!(
3168 pager.recovered_mvcc_commits().is_empty(),
3169 "legacy writes never produce MVCC frames"
3170 );
3171 drop(db);
3172 drop(c2);
3173 cleanup(&path);
3174 }
3175
3176 /// Phase 11.9 — crash recovery sketch. After several
3177 /// concurrent commits we drop the connection without an
3178 /// explicit checkpoint (the auto-checkpoint threshold is
3179 /// well above what 3 frames triggers). A fresh open replays
3180 /// every MVCC frame and reconstructs the chain.
3181 #[test]
3182 fn mvcc_replays_multiple_commits_after_unclean_close() {
3183 let path = tmp_path("mvcc_unclean_close");
3184 {
3185 let mut c = Connection::open(&path).unwrap();
3186 c.execute("PRAGMA journal_mode = mvcc;").unwrap();
3187 c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);")
3188 .unwrap();
3189 for v in 0..5 {
3190 c.execute("BEGIN CONCURRENT;").unwrap();
3191 if v == 0 {
3192 c.execute("INSERT INTO t (id, v) VALUES (1, 0);").unwrap();
3193 } else {
3194 c.execute(&format!("UPDATE t SET v = {v} WHERE id = 1;"))
3195 .unwrap();
3196 }
3197 c.execute("COMMIT;").unwrap();
3198 }
3199 // c drops here without calling checkpoint — the WAL
3200 // still holds every MVCC frame.
3201 }
3202 let c2 = Connection::open(&path).unwrap();
3203 let db = c2.database();
3204 let pager = db.pager.as_ref().unwrap();
3205 let batches = pager.recovered_mvcc_commits();
3206 assert_eq!(batches.len(), 5, "every COMMIT must show up after reopen");
3207 // commit_ts values are strictly increasing.
3208 for w in batches.windows(2) {
3209 assert!(w[0].commit_ts < w[1].commit_ts);
3210 }
3211 drop(db);
3212 drop(c2);
3213 cleanup(&path);
3214 }
3215
3216 #[test]
3217 fn prepare_cached_executes_the_same_as_prepare() {
3218 let mut conn = Connection::open_in_memory().unwrap();
3219 conn.execute("CREATE TABLE t (a INTEGER PRIMARY KEY, b TEXT);")
3220 .unwrap();
3221 let mut ins = conn
3222 .prepare_cached("INSERT INTO t (a, b) VALUES (?, ?)")
3223 .unwrap();
3224 ins.execute_with_params(&[Value::Integer(1), Value::Text("alpha".into())])
3225 .unwrap();
3226 ins.execute_with_params(&[Value::Integer(2), Value::Text("beta".into())])
3227 .unwrap();
3228
3229 let stmt = conn.prepare_cached("SELECT b FROM t WHERE a = ?").unwrap();
3230 let rows = stmt
3231 .query_with_params(&[Value::Integer(2)])
3232 .unwrap()
3233 .collect_all()
3234 .unwrap();
3235 assert_eq!(rows.len(), 1);
3236 assert_eq!(rows[0].get::<String>(0).unwrap(), "beta");
3237 }
3238}