Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod allocator;
34#[allow(dead_code)]
35pub mod cell;
36pub mod file;
37#[allow(dead_code)]
38pub mod freelist;
39#[allow(dead_code)]
40pub mod fts_cell;
41pub mod header;
42#[allow(dead_code)]
43pub mod hnsw_cell;
44#[allow(dead_code)]
45pub mod index_cell;
46#[allow(dead_code)]
47pub mod interior_page;
48pub mod overflow;
49pub mod page;
50pub mod pager;
51#[allow(dead_code)]
52pub mod table_page;
53#[allow(dead_code)]
54pub mod varint;
55#[allow(dead_code)]
56pub mod wal;
57
58use std::collections::{BTreeMap, HashMap};
59use std::path::Path;
60use std::sync::{Arc, Mutex};
61
62use crate::sql::dialect::SqlriteDialect;
63use sqlparser::parser::Parser;
64
65use crate::error::{Result, SQLRiteError};
66use crate::sql::db::database::Database;
67use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69use crate::sql::hnsw::DistanceMetric;
70use crate::sql::pager::cell::Cell;
71use crate::sql::pager::header::DbHeader;
72use crate::sql::pager::index_cell::IndexCell;
73use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
74use crate::sql::pager::overflow::{
75    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
76};
77use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
78use crate::sql::pager::pager::Pager;
79use crate::sql::pager::table_page::TablePage;
80use crate::sql::parser::create::CreateQuery;
81
82// Re-export so callers can spell `sql::pager::AccessMode` without
83// reaching into the `pager::pager::pager` submodule path.
84pub use crate::sql::pager::pager::AccessMode;
85
86/// Name of the internal catalog table. Reserved — user CREATEs of this
87/// name must be rejected upstream.
88pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
89
90/// Opens a database file in read-write mode. Shorthand for
91/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
92pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
93    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
94}
95
96/// Opens a database file in read-only mode. Acquires a shared OS-level
97/// advisory lock, so other read-only openers coexist but any writer is
98/// excluded. Attempts to mutate the returned `Database` (e.g. an
99/// `INSERT`, or a `save_database` call against it) bottom out in a
100/// `cannot commit: database is opened read-only` error from the Pager.
101pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
102    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
103}
104
105/// Opens a database file and reconstructs the in-memory `Database`,
106/// leaving the long-lived `Pager` attached for subsequent auto-save
107/// (read-write) or consistent-snapshot reads (read-only).
108pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
109    let pager = Pager::open_with_mode(path, mode)?;
110
111    // 1. Load sqlrite_master from the tree at header.schema_root_page.
112    let mut master = build_empty_master_table();
113    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
114
115    // 2. Two passes over master rows: first build every user table, then
116    //    attach secondary indexes. Indexes need their base table to exist
117    //    before we can populate them. Auto-indexes are created at table
118    //    build time so we only have to load explicit indexes from disk
119    //    (but we also reload the auto-index CONTENT because Table::new
120    //    built it empty).
121    let mut db = Database::new(db_name);
122    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
123
124    for rowid in master.rowids() {
125        let ty = take_text(&master, "type", rowid)?;
126        let name = take_text(&master, "name", rowid)?;
127        let sql = take_text(&master, "sql", rowid)?;
128        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
129        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
130
131        match ty.as_str() {
132            "table" => {
133                let (parsed_name, columns) = parse_create_sql(&sql)?;
134                if parsed_name != name {
135                    return Err(SQLRiteError::Internal(format!(
136                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
137                    )));
138                }
139                let mut table = build_empty_table(&name, columns, last_rowid);
140                if rootpage != 0 {
141                    load_table_rows(&pager, &mut table, rootpage)?;
142                }
143                if last_rowid > table.last_rowid {
144                    table.last_rowid = last_rowid;
145                }
146                db.tables.insert(name, table);
147            }
148            "index" => {
149                index_rows.push(IndexCatalogRow {
150                    name,
151                    sql,
152                    rootpage,
153                });
154            }
155            other => {
156                return Err(SQLRiteError::Internal(format!(
157                    "sqlrite_master row '{name}' has unknown type '{other}'"
158                )));
159            }
160        }
161    }
162
163    // Second pass: attach each index to its table. HNSW indexes
164    // (Phase 7d.2) take a different code path because their persisted
165    // form is just the CREATE INDEX SQL — the graph itself isn't
166    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
167    // and route to a graph-rebuild instead of the B-Tree-cell load.
168    //
169    // Phase 8b — same shape for FTS indexes. The posting lists aren't
170    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
171    // open and let `execute_create_index` walk current rows.
172    for row in index_rows {
173        if create_index_sql_uses_hnsw(&row.sql) {
174            rebuild_hnsw_index(&mut db, &pager, &row)?;
175        } else if create_index_sql_uses_fts(&row.sql) {
176            rebuild_fts_index(&mut db, &pager, &row)?;
177        } else {
178            attach_index(&mut db, &pager, row)?;
179        }
180    }
181
182    // Phase 11.9 — replay any MVCC commit batches recovered from
183    // the WAL into the freshly-built `MvStore`, and seed the
184    // `MvccClock` past the highest persisted timestamp. Without
185    // this step the in-memory MVCC state would always start blank
186    // on reopen — fine for legacy single-session workloads, but a
187    // correctness gap once `BEGIN CONCURRENT` is in play (a
188    // second process could hand out a `begin_ts` below an
189    // already-committed version's `end`, breaking the visibility
190    // rule).
191    //
192    // The clock seed is the larger of (header.clock_high_water,
193    // max(commit_ts among replayed batches)) so a crash between
194    // commits and the next checkpoint — where the header's
195    // high-water lags reality — still produces a clock that
196    // doesn't regress.
197    replay_mvcc_into_db(&mut db, &pager)?;
198
199    db.source_path = Some(path.to_path_buf());
200    db.pager = Some(pager);
201    Ok(db)
202}
203
204/// Phase 11.9 — drains every MVCC commit batch the Pager recovered
205/// from the WAL into `db.mv_store`, and advances `db.mvcc_clock`
206/// to at least the highest observed timestamp.
207///
208/// Batches are replayed in WAL order, which matches commit order
209/// (the WAL appends sequentially). Each record's `commit_ts`
210/// becomes the version's `begin`, with the previous latest
211/// version's `end` capped at the same timestamp — identical to
212/// the live-commit path's `MvStore::push_committed`.
213fn replay_mvcc_into_db(db: &mut Database, pager: &Pager) -> Result<()> {
214    use crate::mvcc::RowVersion;
215
216    let mut clock_seed = pager.clock_high_water();
217    for batch in pager.recovered_mvcc_commits() {
218        if batch.commit_ts > clock_seed {
219            clock_seed = batch.commit_ts;
220        }
221        for rec in &batch.records {
222            let version = RowVersion::committed(batch.commit_ts, rec.payload.clone());
223            db.mv_store
224                .push_committed(rec.row.clone(), version)
225                .map_err(|e| {
226                    SQLRiteError::Internal(format!(
227                        "WAL MVCC replay: push_committed failed for {}/{}: {e}",
228                        rec.row.table, rec.row.rowid,
229                    ))
230                })?;
231        }
232    }
233    if clock_seed > 0 {
234        db.mvcc_clock.observe(clock_seed);
235    }
236    Ok(())
237}
238
239/// Catalog row for a secondary index — deferred until after every table is
240/// loaded so the index's base table exists by the time we populate it.
241struct IndexCatalogRow {
242    name: String,
243    sql: String,
244    rootpage: u32,
245}
246
247/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
248/// haven't changed; the [`PageAllocator`] preserves per-table page
249/// numbers across saves so unchanged tables produce zero dirty frames.
250///
251/// Pages that were live before this save but aren't restaged this round
252/// (e.g., the leaves of a dropped table) move onto a persisted free
253/// list rooted at `header.freelist_head`; subsequent saves draw from
254/// the freelist before extending the file. `VACUUM` (see
255/// [`vacuum_database`]) compacts the file by ignoring the freelist and
256/// allocating linearly from page 1.
257///
258/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
259pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
260    save_database_with_mode(db, path, /*compact=*/ false)
261}
262
263/// Reclaims space by rewriting every live B-Tree contiguously from
264/// page 1, with no freelist. Equivalent to `save_database` but ignores
265/// the existing freelist and per-table preferred pools — every page is
266/// allocated by extending the high-water mark — so the resulting file
267/// is tightly packed and the freelist is empty.
268///
269/// Used by the SQL-level `VACUUM;` statement.
270pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
271    save_database_with_mode(db, path, /*compact=*/ true)
272}
273
274/// Shared save core. `compact = false` is the normal save path (uses
275/// the existing freelist + per-table preferred pools). `compact = true`
276/// is the VACUUM path (empty freelist, empty preferred pools, linear
277/// allocation from page 1).
278fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
279    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
280    // marked dirty. Done up front under the &mut Database borrow we
281    // already hold, before the immutable iteration loops below need
282    // their own borrow.
283    rebuild_dirty_hnsw_indexes(db)?;
284    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
285    rebuild_dirty_fts_indexes(db);
286
287    let same_path = db.source_path.as_deref() == Some(path);
288    let mut pager = if same_path {
289        match db.pager.take() {
290            Some(p) => p,
291            None if path.exists() => Pager::open(path)?,
292            None => Pager::create(path)?,
293        }
294    } else if path.exists() {
295        Pager::open(path)?
296    } else {
297        Pager::create(path)?
298    };
299
300    // Snapshot what was live BEFORE we reset staged. Used to compute the
301    // newly-freed set after staging completes. Page 0 (the header) is
302    // never on the freelist — it's always live.
303    let old_header = pager.header();
304    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
305
306    // Read the previously-persisted freelist so its leaf pages can be
307    // reused as preferred allocations and its trunk pages don't leak.
308    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
309        (Vec::new(), Vec::new())
310    } else {
311        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
312    };
313
314    // Snapshot the previous rootpages of each table/index so we can
315    // seed per-table preferred pools (the unchanged-table case stages
316    // byte-identical pages → diff pager skips every write for it).
317    let old_rootpages = if compact {
318        HashMap::new()
319    } else {
320        read_old_rootpages(&pager, old_header.schema_root_page)?
321    };
322
323    // SQLR-1 — snapshot every prior B-Tree's page set NOW, before any
324    // staging starts. `Pager::read_page` shadows on-disk bytes with the
325    // current `staged` buffer, so if we deferred these walks until each
326    // object's turn in the staging loop, a *new* index added in this
327    // save would extend past the old high-water and overwrite the
328    // pages of any later-staged object whose old root sits in that
329    // range — including `sqlrite_master`, which is always staged last.
330    // The follow-up walk would then read the wrong B-Tree's bytes and
331    // either hand the allocator a bogus preferred pool or panic
332    // dispatching cells (a table-cell decoder vs. an index leaf, the
333    // shape of the original SQLR-1 panic). Walking up front pins each
334    // map to the committed bytes that were on disk before this save
335    // touched anything.
336    let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
337        HashMap::new()
338    } else {
339        let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
340        for ((kind, name), &root) in &old_rootpages {
341            // Tables can carry overflow chains; index/HNSW/FTS leaves
342            // never overflow in the current encoding, so the cheaper
343            // walk suffices for them.
344            let follow = kind == "table";
345            let pages = collect_pages_for_btree(&pager, root, follow)?;
346            map.insert((kind.clone(), name.clone()), pages);
347        }
348        map
349    };
350    let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
351        Vec::new()
352    } else {
353        collect_pages_for_btree(
354            &pager,
355            old_header.schema_root_page,
356            /*follow_overflow=*/ true,
357        )?
358    };
359
360    pager.clear_staged();
361
362    // Allocator: in normal mode, seed with the old freelist; in compact
363    // mode, start empty so allocation extends linearly from page 1.
364    use std::collections::VecDeque;
365    let initial_freelist: VecDeque<u32> = if compact {
366        VecDeque::new()
367    } else {
368        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
369    };
370    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
371
372    // 1. Stage each user table's B-Tree, collecting master-row info.
373    //    `kind` is "table" or "index" — master has one row per each.
374    let mut master_rows: Vec<CatalogEntry> = Vec::new();
375
376    let mut table_names: Vec<&String> = db.tables.keys().collect();
377    table_names.sort();
378    for name in table_names {
379        if name == MASTER_TABLE_NAME {
380            return Err(SQLRiteError::Internal(format!(
381                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
382            )));
383        }
384        if !compact {
385            if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
386                alloc.set_preferred(prev.clone());
387            }
388        }
389        let table = &db.tables[name];
390        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
391        alloc.finish_preferred();
392        master_rows.push(CatalogEntry {
393            kind: "table".into(),
394            name: name.clone(),
395            sql: table_to_create_sql(table),
396            rootpage,
397            last_rowid: table.last_rowid,
398        });
399    }
400
401    // 2. Stage each secondary index's B-Tree. Indexes persist in a
402    //    deterministic order: sorted by (owning_table, index_name).
403    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
404    for table in db.tables.values() {
405        for idx in &table.secondary_indexes {
406            index_entries.push((table, idx));
407        }
408    }
409    index_entries
410        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
411    for (_table, idx) in index_entries {
412        if !compact {
413            if let Some(prev) =
414                old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
415            {
416                alloc.set_preferred(prev.clone());
417            }
418        }
419        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
420        alloc.finish_preferred();
421        master_rows.push(CatalogEntry {
422            kind: "index".into(),
423            name: idx.name.clone(),
424            sql: idx.synthesized_sql(),
425            rootpage,
426            last_rowid: 0,
427        });
428    }
429
430    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
431    //     page trees, with the rootpage recorded in sqlrite_master.
432    //     Reopen loads the graph back from cells (fast, exact match)
433    //     instead of rebuilding from rows.
434    //
435    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
436    //     rebuilt from current rows BEFORE staging, so the on-disk
437    //     graph reflects the current row set.
438    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
439    for table in db.tables.values() {
440        for entry in &table.hnsw_indexes {
441            hnsw_entries.push((table, entry));
442        }
443    }
444    hnsw_entries
445        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
446    for (table, entry) in hnsw_entries {
447        if !compact {
448            if let Some(prev) =
449                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
450            {
451                alloc.set_preferred(prev.clone());
452            }
453        }
454        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
455        alloc.finish_preferred();
456        master_rows.push(CatalogEntry {
457            kind: "index".into(),
458            name: entry.name.clone(),
459            sql: synthesize_hnsw_create_index_sql(
460                &entry.name,
461                &table.tb_name,
462                &entry.column_name,
463                entry.metric,
464            ),
465            rootpage,
466            last_rowid: 0,
467        });
468    }
469
470    // 2c. Phase 8c — persist FTS posting lists as their own
471    //     cell-encoded page trees, with the rootpage recorded in
472    //     sqlrite_master. Reopen loads the postings back from cells
473    //     (fast, exact match) instead of re-tokenizing rows.
474    //
475    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
476    //     rebuilt from current rows BEFORE staging by
477    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
478    //     the current row set.
479    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
480    for table in db.tables.values() {
481        for entry in &table.fts_indexes {
482            fts_entries.push((table, entry));
483        }
484    }
485    fts_entries
486        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
487    let any_fts = !fts_entries.is_empty();
488    for (table, entry) in fts_entries {
489        if !compact {
490            if let Some(prev) =
491                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
492            {
493                alloc.set_preferred(prev.clone());
494            }
495        }
496        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
497        alloc.finish_preferred();
498        master_rows.push(CatalogEntry {
499            kind: "index".into(),
500            name: entry.name.clone(),
501            sql: format!(
502                "CREATE INDEX {} ON {} USING fts ({})",
503                entry.name, table.tb_name, entry.column_name
504            ),
505            rootpage,
506            last_rowid: 0,
507        });
508    }
509
510    // 3. Build an in-memory sqlrite_master with one row per table or index,
511    //    then stage it via the same tree-build path. Seed master's
512    //    preferred pool with the previous master tree's pages so the
513    //    catalog page numbers stay stable across saves whenever the
514    //    catalog content didn't change.
515    let mut master = build_empty_master_table();
516    for (i, entry) in master_rows.into_iter().enumerate() {
517        let rowid = (i as i64) + 1;
518        master.restore_row(
519            rowid,
520            vec![
521                Some(Value::Text(entry.kind)),
522                Some(Value::Text(entry.name)),
523                Some(Value::Text(entry.sql)),
524                Some(Value::Integer(entry.rootpage as i64)),
525                Some(Value::Integer(entry.last_rowid)),
526            ],
527        )?;
528    }
529    if !compact && !old_master_pages.is_empty() {
530        // Use the page list snapshotted before any staging touched
531        // disk; re-walking here would read whatever a new index
532        // already restaged on top of master's old root (SQLR-1).
533        alloc.set_preferred(old_master_pages.clone());
534    }
535    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
536    alloc.finish_preferred();
537
538    // 4. Compute newly-freed pages: the previously-live set minus what
539    //    we just restaged. The previous freelist's trunk pages get
540    //    re-encoded too — they're in `old_live`, weren't restaged, so
541    //    the filter naturally moves them to the new freelist.
542    //
543    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
544    // routing it onto the new freelist. The whole point of VACUUM is
545    // to let the file truncate to the new high-water mark, so any page
546    // past it gets dropped at the next checkpoint.
547    if !compact {
548        let used = alloc.used().clone();
549        let mut newly_freed: Vec<u32> = old_live
550            .iter()
551            .copied()
552            .filter(|p| !used.contains(p))
553            .collect();
554        let _ = &old_free_trunks; // silenced — handled by the old_live filter
555        alloc.add_to_freelist(newly_freed.drain(..));
556    }
557
558    // 5. Encode the new freelist into trunk pages. `stage_freelist`
559    //    consumes some of the free pages AS the trunk pages themselves —
560    //    a trunk is just a free page borrowed for metadata. Pages that
561    //    were on the freelist but become trunks no longer need to be
562    //    "extension" pages; the high-water mark from the staging loop
563    //    above is already correct.
564    let new_free_pages = alloc.drain_freelist();
565    let new_freelist_head =
566        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
567
568    // 6. Pick the format version. v6 is on demand: only bumps when the
569    //    new freelist is non-empty. FTS-bearing files keep their v5
570    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
571    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
572    let format_version = if new_freelist_head != 0 {
573        FORMAT_VERSION_V6
574    } else if any_fts {
575        // Preserve a v6 file at v6 (don't downgrade) but otherwise
576        // bump v4 → v5 for FTS like Phase 8c does.
577        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
578    } else {
579        // Preserve whatever the file already was.
580        old_header.format_version
581    };
582
583    pager.commit(DbHeader {
584        page_count: alloc.high_water(),
585        schema_root_page: master_root,
586        format_version,
587        freelist_head: new_freelist_head,
588    })?;
589
590    if same_path {
591        db.pager = Some(pager);
592    }
593    Ok(())
594}
595
596/// Build material for a single row in sqlrite_master.
597struct CatalogEntry {
598    kind: String, // "table" or "index"
599    name: String,
600    sql: String,
601    rootpage: u32,
602    last_rowid: i64,
603}
604
605// -------------------------------------------------------------------------
606// sqlrite_master — hardcoded catalog table schema
607
608fn build_empty_master_table() -> Table {
609    // Phase 3e: `type` is the first column, matching SQLite's convention.
610    // It distinguishes `'table'` rows from `'index'` rows.
611    let columns = vec![
612        Column::new("type".into(), "text".into(), false, true, false),
613        Column::new("name".into(), "text".into(), true, true, true),
614        Column::new("sql".into(), "text".into(), false, true, false),
615        Column::new("rootpage".into(), "integer".into(), false, true, false),
616        Column::new("last_rowid".into(), "integer".into(), false, true, false),
617    ];
618    build_empty_table(MASTER_TABLE_NAME, columns, 0)
619}
620
621/// SQLR-10 — builds an in-memory, read-only snapshot of `sqlrite_master`
622/// from the live `Database`, so `SELECT … FROM sqlrite_master` can
623/// introspect the catalog without a save round-trip. One row per user
624/// table and per index (B-Tree / HNSW / FTS), reusing the exact same SQL
625/// synthesis the persistence path uses — so the `name` / `type` / `sql`
626/// columns match byte-for-byte what a saved-then-dumped catalog shows.
627///
628/// Rows are ordered deterministically: tables sorted by name, then
629/// indexes sorted by `(owning_table, index_name)` — mirroring
630/// [`save_database_with_mode`]'s staging order.
631///
632/// The `rootpage` column is `0` for every row: page assignment only
633/// happens at save time, so a live/in-memory view has no meaningful page
634/// number. The column is kept for schema parity with the persisted
635/// catalog (and SQLite's `sqlite_master`); callers needing introspection
636/// use `type` / `name` / `sql`. `last_rowid` carries the table's current
637/// auto-increment high-water mark (0 for index rows).
638pub(crate) fn build_master_table_snapshot(db: &Database) -> Result<Table> {
639    let mut master = build_empty_master_table();
640
641    let mut entries: Vec<CatalogEntry> = Vec::new();
642
643    // Tables, sorted by name.
644    let mut table_names: Vec<&String> = db.tables.keys().collect();
645    table_names.sort();
646    for name in &table_names {
647        let table = &db.tables[*name];
648        entries.push(CatalogEntry {
649            kind: "table".into(),
650            name: (*name).clone(),
651            sql: table_to_create_sql(table),
652            rootpage: 0,
653            last_rowid: table.last_rowid,
654        });
655    }
656
657    // Indexes across all three families, sorted by (table, index name) —
658    // collected into one list so the final order matches how a reopened
659    // catalog enumerates them.
660    let mut index_entries: Vec<(String, String, String)> = Vec::new(); // (table, index_name, sql)
661    for table in db.tables.values() {
662        for idx in &table.secondary_indexes {
663            index_entries.push((
664                table.tb_name.clone(),
665                idx.name.clone(),
666                idx.synthesized_sql(),
667            ));
668        }
669        for entry in &table.hnsw_indexes {
670            index_entries.push((
671                table.tb_name.clone(),
672                entry.name.clone(),
673                synthesize_hnsw_create_index_sql(
674                    &entry.name,
675                    &table.tb_name,
676                    &entry.column_name,
677                    entry.metric,
678                ),
679            ));
680        }
681        for entry in &table.fts_indexes {
682            index_entries.push((
683                table.tb_name.clone(),
684                entry.name.clone(),
685                format!(
686                    "CREATE INDEX {} ON {} USING fts ({})",
687                    entry.name, table.tb_name, entry.column_name
688                ),
689            ));
690        }
691    }
692    index_entries.sort_by(|(ta, ia, _), (tb, ib, _)| ta.cmp(tb).then(ia.cmp(ib)));
693    for (_table, name, sql) in index_entries {
694        entries.push(CatalogEntry {
695            kind: "index".into(),
696            name,
697            sql,
698            rootpage: 0,
699            last_rowid: 0,
700        });
701    }
702
703    for (i, entry) in entries.into_iter().enumerate() {
704        let rowid = (i as i64) + 1;
705        master.restore_row(
706            rowid,
707            vec![
708                Some(Value::Text(entry.kind)),
709                Some(Value::Text(entry.name)),
710                Some(Value::Text(entry.sql)),
711                Some(Value::Integer(entry.rootpage as i64)),
712                Some(Value::Integer(entry.last_rowid)),
713            ],
714        )?;
715    }
716
717    Ok(master)
718}
719
720/// Reads a required Text column from a known-good catalog row.
721fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
722    match table.get_value(col, rowid) {
723        Some(Value::Text(s)) => Ok(s),
724        other => Err(SQLRiteError::Internal(format!(
725            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
726        ))),
727    }
728}
729
730/// Reads a required Integer column from a known-good catalog row.
731fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
732    match table.get_value(col, rowid) {
733        Some(Value::Integer(v)) => Ok(v),
734        other => Err(SQLRiteError::Internal(format!(
735            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
736        ))),
737    }
738}
739
740// -------------------------------------------------------------------------
741// CREATE-TABLE SQL synthesis and re-parsing
742
743/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
744/// Deterministic: same schema → same SQL, so diffing commits stay stable.
745fn table_to_create_sql(table: &Table) -> String {
746    let mut parts = Vec::with_capacity(table.columns.len());
747    for c in &table.columns {
748        // Render the SQL type literally so the round-trip through
749        // CREATE TABLE re-parsing recreates the same schema. Vector
750        // carries its dimension inline.
751        let ty: String = match &c.datatype {
752            DataType::Integer => "INTEGER".to_string(),
753            DataType::Text => "TEXT".to_string(),
754            DataType::Real => "REAL".to_string(),
755            DataType::Bool => "BOOLEAN".to_string(),
756            DataType::Vector(dim) => format!("VECTOR({dim})"),
757            DataType::Json => "JSON".to_string(),
758            DataType::None | DataType::Invalid => "TEXT".to_string(),
759        };
760        let mut piece = format!("{} {}", c.column_name, ty);
761        if c.is_pk {
762            piece.push_str(" PRIMARY KEY");
763        } else {
764            if c.is_unique {
765                piece.push_str(" UNIQUE");
766            }
767            if c.not_null {
768                piece.push_str(" NOT NULL");
769            }
770        }
771        if let Some(default) = &c.default {
772            piece.push_str(" DEFAULT ");
773            piece.push_str(&render_default_literal(default));
774        }
775        parts.push(piece);
776    }
777    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
778}
779
780/// Renders a DEFAULT value back to SQL-literal form so the synthesized
781/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
782/// single-quoted with single-quote doubling for escaping. Vector defaults
783/// are not currently expressible at CREATE TABLE time, so we render them
784/// as their bracket-array form (matches the INSERT literal grammar).
785fn render_default_literal(value: &Value) -> String {
786    match value {
787        Value::Integer(i) => i.to_string(),
788        Value::Real(f) => f.to_string(),
789        Value::Bool(b) => {
790            if *b {
791                "TRUE".to_string()
792            } else {
793                "FALSE".to_string()
794            }
795        }
796        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
797        Value::Null => "NULL".to_string(),
798        Value::Vector(_) => value.to_display_string(),
799    }
800}
801
802/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
803/// and produces our internal column list. Returns `(table_name, columns)`.
804fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
805    let dialect = SqlriteDialect::new();
806    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
807    let stmt = ast.pop().ok_or_else(|| {
808        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
809    })?;
810    let create = CreateQuery::new(&stmt)?;
811    let columns = create
812        .columns
813        .into_iter()
814        .map(|pc| {
815            Column::with_default(
816                pc.name,
817                pc.datatype,
818                pc.is_pk,
819                pc.not_null,
820                pc.is_unique,
821                pc.default,
822            )
823        })
824        .collect();
825    Ok((create.table_name, columns))
826}
827
828// -------------------------------------------------------------------------
829// In-memory table (re)construction
830
831/// Builds an empty in-memory `Table` given the declared columns.
832fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
833    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
834    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
835    {
836        let mut map = rows.lock().expect("rows mutex poisoned");
837        for col in &columns {
838            // Mirror the dispatch in `Table::new` so the reconstructed
839            // table has the same shape it'd have if it were built fresh
840            // from SQL. Phase 7a adds the Vector arm — without it,
841            // VECTOR columns silently restore as Row::None and every
842            // restore_row hits a "storage None vs value Some(Vector(...))"
843            // type mismatch.
844            let row = match &col.datatype {
845                DataType::Integer => Row::Integer(BTreeMap::new()),
846                DataType::Text => Row::Text(BTreeMap::new()),
847                DataType::Real => Row::Real(BTreeMap::new()),
848                DataType::Bool => Row::Bool(BTreeMap::new()),
849                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
850                // JSON columns reuse Text storage — see Table::new and
851                // Phase 7e's scope-correction note.
852                DataType::Json => Row::Text(BTreeMap::new()),
853                DataType::None | DataType::Invalid => Row::None,
854            };
855            map.insert(col.column_name.clone(), row);
856
857            // Auto-create UNIQUE/PK indexes so the restored table has the
858            // same shape Table::new would have built from fresh SQL.
859            if (col.is_pk || col.is_unique)
860                && matches!(col.datatype, DataType::Integer | DataType::Text)
861            {
862                if let Ok(idx) = SecondaryIndex::new(
863                    SecondaryIndex::auto_name(name, &col.column_name),
864                    name.to_string(),
865                    col.column_name.clone(),
866                    &col.datatype,
867                    true,
868                    IndexOrigin::Auto,
869                ) {
870                    secondary_indexes.push(idx);
871                }
872            }
873        }
874    }
875
876    let primary_key = columns
877        .iter()
878        .find(|c| c.is_pk)
879        .map(|c| c.column_name.clone())
880        .unwrap_or_else(|| "-1".to_string());
881
882    Table {
883        tb_name: name.to_string(),
884        columns,
885        rows,
886        secondary_indexes,
887        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
888        // executing each `CREATE INDEX … USING hnsw` SQL stored in
889        // `sqlrite_master`. This builder produces the empty shell;
890        // `replay_create_index_for_hnsw` (in this same module) walks
891        // sqlrite_master after every table is loaded and rebuilds the
892        // graph from current row data. Persistence of the graph itself
893        // (avoiding the on-open rebuild cost) is Phase 7d.3.
894        hnsw_indexes: Vec::new(),
895        // FTS indexes (Phase 8b) follow the same pattern — the
896        // CREATE INDEX … USING fts SQL is the source of truth on open
897        // and the in-memory posting list gets rebuilt from current
898        // rows. Cell-encoded persistence of the postings is Phase 8c.
899        fts_indexes: Vec::new(),
900        last_rowid,
901        primary_key,
902    }
903}
904
905// -------------------------------------------------------------------------
906// Leaf-chain read / write
907
908/// Walks a table's B-Tree from `root_page`, following the leftmost-child
909/// chain down to the first leaf, then iterating leaves via their sibling
910/// `next_page` pointers. Every cell is decoded and replayed into `table`.
911///
912/// Open-path note: we eagerly materialize the entire table into `Table`'s
913/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
914/// on demand so queries can stream through the tree without a full upfront
915/// load.
916/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
917/// index on its base table by walking the tree of index cells at
918/// `rootpage`. The base table is expected to already be in `db.tables`.
919fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
920    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
921
922    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
923        SQLRiteError::Internal(format!(
924            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
925            row.name
926        ))
927    })?;
928    let datatype = table
929        .columns
930        .iter()
931        .find(|c| c.column_name == column_name)
932        .map(|c| clone_datatype(&c.datatype))
933        .ok_or_else(|| {
934            SQLRiteError::Internal(format!(
935                "index '{}' references unknown column '{column_name}' on '{table_name}'",
936                row.name
937            ))
938        })?;
939
940    // An auto-index on this column may already exist (built by
941    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
942    // the slot instead of adding a duplicate entry.
943    let existing_slot = table
944        .secondary_indexes
945        .iter()
946        .position(|i| i.name == row.name);
947    let idx = match existing_slot {
948        Some(i) => {
949            // Drain any entries that may have been populated during table
950            // restore_row calls — we're about to repopulate from the
951            // persisted tree.
952            table.secondary_indexes.remove(i)
953        }
954        None => SecondaryIndex::new(
955            row.name.clone(),
956            table_name.clone(),
957            column_name.clone(),
958            &datatype,
959            is_unique,
960            IndexOrigin::Explicit,
961        )?,
962    };
963    let mut idx = idx;
964    // Wipe any stale entries from the auto path so the load is idempotent.
965    let is_unique_flag = idx.is_unique;
966    let origin = idx.origin;
967    idx = SecondaryIndex::new(
968        idx.name,
969        idx.table_name,
970        idx.column_name,
971        &datatype,
972        is_unique_flag,
973        origin,
974    )?;
975
976    // Populate from the index tree's cells.
977    load_index_rows(pager, &mut idx, row.rootpage)?;
978
979    table.secondary_indexes.push(idx);
980    Ok(())
981}
982
983/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
984/// every `(value, rowid)` pair into `idx`.
985fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
986    if root_page == 0 {
987        return Ok(());
988    }
989    let first_leaf = find_leftmost_leaf(pager, root_page)?;
990    let mut current = first_leaf;
991    while current != 0 {
992        let page_buf = pager
993            .read_page(current)
994            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
995        if page_buf[0] != PageType::TableLeaf as u8 {
996            return Err(SQLRiteError::Internal(format!(
997                "page {current} tagged {} but expected TableLeaf (index)",
998                page_buf[0]
999            )));
1000        }
1001        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1002        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1003            .try_into()
1004            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
1005        let leaf = TablePage::from_bytes(payload);
1006
1007        for slot in 0..leaf.slot_count() {
1008            // Slots on an index page hold KIND_INDEX cells; decode directly.
1009            let offset = leaf.slot_offset_raw(slot)?;
1010            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
1011            idx.insert(&ic.value, ic.rowid)?;
1012        }
1013        current = next_leaf;
1014    }
1015    Ok(())
1016}
1017
1018/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
1019/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
1020///
1021/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
1022/// still works; the only shape we accept is single-column indexes.
1023fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
1024    use sqlparser::ast::{CreateIndex, Expr, Statement};
1025
1026    let dialect = SqlriteDialect::new();
1027    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1028    let Some(Statement::CreateIndex(CreateIndex {
1029        table_name,
1030        columns,
1031        unique,
1032        ..
1033    })) = ast.pop()
1034    else {
1035        return Err(SQLRiteError::Internal(format!(
1036            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
1037        )));
1038    };
1039    if columns.len() != 1 {
1040        return Err(SQLRiteError::NotImplemented(
1041            "multi-column indexes aren't supported yet".to_string(),
1042        ));
1043    }
1044    let col = match &columns[0].column.expr {
1045        Expr::Identifier(ident) => ident.value.clone(),
1046        Expr::CompoundIdentifier(parts) => {
1047            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1048        }
1049        other => {
1050            return Err(SQLRiteError::Internal(format!(
1051                "unsupported indexed column expression: {other:?}"
1052            )));
1053        }
1054    };
1055    Ok((table_name.to_string(), col, unique))
1056}
1057
1058/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
1059/// Used by the open path to route HNSW indexes to the graph-rebuild path
1060/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
1061/// don't have a USING clause, so they all return false and continue
1062/// taking the existing path.
1063fn create_index_sql_uses_hnsw(sql: &str) -> bool {
1064    use sqlparser::ast::{CreateIndex, IndexType, Statement};
1065
1066    let dialect = SqlriteDialect::new();
1067    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
1068        return false;
1069    };
1070    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
1071        return false;
1072    };
1073    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
1074}
1075
1076/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
1077/// Mirrors [`create_index_sql_uses_hnsw`].
1078fn create_index_sql_uses_fts(sql: &str) -> bool {
1079    use sqlparser::ast::{CreateIndex, IndexType, Statement};
1080
1081    let dialect = SqlriteDialect::new();
1082    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
1083        return false;
1084    };
1085    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
1086        return false;
1087    };
1088    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
1089}
1090
1091/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
1092/// paths mirror [`rebuild_hnsw_index`]:
1093///
1094///   - **rootpage != 0** (Phase 8c default): the posting list is
1095///     persisted as cell-encoded pages. Read every cell directly via
1096///     [`load_fts_postings`] and reconstruct the index — no
1097///     re-tokenization, exact bit-for-bit reproduction.
1098///
1099///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
1100///     for files saved by Phase 8b before persistence landed. Replay
1101///     the CREATE INDEX SQL through `execute_create_index`, which
1102///     walks the table's current rows and tokenizes them fresh.
1103fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1104    use crate::sql::db::table::FtsIndexEntry;
1105    use crate::sql::executor::execute_create_index;
1106    use crate::sql::fts::PostingList;
1107    use sqlparser::ast::Statement;
1108
1109    let dialect = SqlriteDialect::new();
1110    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1111    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
1112        return Err(SQLRiteError::Internal(format!(
1113            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
1114            row.sql
1115        )));
1116    };
1117
1118    if row.rootpage == 0 {
1119        // Compatibility path — no persisted postings; replay rows.
1120        execute_create_index(&stmt, db)?;
1121        return Ok(());
1122    }
1123
1124    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
1125    let index = PostingList::from_persisted_postings(doc_lengths, postings);
1126    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
1127    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
1128        SQLRiteError::Internal(format!(
1129            "FTS index '{}' references unknown table '{tbl_name}'",
1130            row.name
1131        ))
1132    })?;
1133    table_mut.fts_indexes.push(FtsIndexEntry {
1134        name: row.name.clone(),
1135        column_name: col_name,
1136        index,
1137        needs_rebuild: false,
1138    });
1139    Ok(())
1140}
1141
1142/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
1143/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
1144fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
1145    use sqlparser::ast::{CreateIndex, Expr, Statement};
1146
1147    let dialect = SqlriteDialect::new();
1148    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1149    let Some(Statement::CreateIndex(CreateIndex {
1150        table_name,
1151        columns,
1152        ..
1153    })) = ast.pop()
1154    else {
1155        return Err(SQLRiteError::Internal(format!(
1156            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
1157        )));
1158    };
1159    if columns.len() != 1 {
1160        return Err(SQLRiteError::NotImplemented(
1161            "multi-column FTS indexes aren't supported yet".to_string(),
1162        ));
1163    }
1164    let col = match &columns[0].column.expr {
1165        Expr::Identifier(ident) => ident.value.clone(),
1166        Expr::CompoundIdentifier(parts) => {
1167            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1168        }
1169        other => {
1170            return Err(SQLRiteError::Internal(format!(
1171                "FTS CREATE INDEX has unexpected column expr: {other:?}"
1172            )));
1173        }
1174    };
1175    Ok((table_name.to_string(), col))
1176}
1177
1178/// Loads (or rebuilds) an HNSW index on database open. Two paths:
1179///
1180///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1181///     as cell-encoded pages. Read every node directly via
1182///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1183///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1184///
1185///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1186///     files saved by Phase 7d.2 before persistence landed. Replay the
1187///     CREATE INDEX SQL through `execute_create_index`, which walks the
1188///     table's current rows and populates a fresh graph. Slower but
1189///     correctness-equivalent on the first save with the new code.
1190fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1191    use crate::sql::db::table::HnswIndexEntry;
1192    use crate::sql::executor::execute_create_index;
1193    use crate::sql::hnsw::HnswIndex;
1194    use sqlparser::ast::Statement;
1195
1196    let dialect = SqlriteDialect::new();
1197    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1198    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
1199        return Err(SQLRiteError::Internal(format!(
1200            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1201            row.sql
1202        )));
1203    };
1204
1205    if row.rootpage == 0 {
1206        // Compatibility path — no persisted graph; walk current rows.
1207        execute_create_index(&stmt, db)?;
1208        return Ok(());
1209    }
1210
1211    // Persistence path — read the cell tree, deserialize. The metric
1212    // travels through the synthesized CREATE INDEX SQL stored in
1213    // `sqlrite_master`; pre-SQLR-28 rows omit the WITH clause and
1214    // decode as L2, which matches what those graphs were built with.
1215    let (tbl_name, col_name, metric) = parse_hnsw_create_index_sql(&row.sql)?;
1216    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
1217    let index = HnswIndex::from_persisted_nodes(metric, 0xC0FFEE, nodes);
1218
1219    // Parse the CREATE INDEX to know which table + column to attach to
1220    // — same shape as the row-walk path; we just don't execute it.
1221    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
1222        SQLRiteError::Internal(format!(
1223            "HNSW index '{}' references unknown table '{tbl_name}'",
1224            row.name
1225        ))
1226    })?;
1227    table_mut.hnsw_indexes.push(HnswIndexEntry {
1228        name: row.name.clone(),
1229        column_name: col_name,
1230        metric,
1231        index,
1232        needs_rebuild: false,
1233    });
1234    Ok(())
1235}
1236
1237/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1238/// page tree at `root_page` and decode each cell as a node. Returns
1239/// the (node_id, layers) tuples in slot-order (already ascending by
1240/// node_id since they were staged that way). The caller hands them to
1241/// `HnswIndex::from_persisted_nodes`.
1242fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1243    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1244
1245    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1246    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1247    let mut current = first_leaf;
1248    while current != 0 {
1249        let page_buf = pager
1250            .read_page(current)
1251            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1252        if page_buf[0] != PageType::TableLeaf as u8 {
1253            return Err(SQLRiteError::Internal(format!(
1254                "page {current} tagged {} but expected TableLeaf (HNSW)",
1255                page_buf[0]
1256            )));
1257        }
1258        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1259        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1260            .try_into()
1261            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1262        let leaf = TablePage::from_bytes(payload);
1263        for slot in 0..leaf.slot_count() {
1264            let offset = leaf.slot_offset_raw(slot)?;
1265            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1266            nodes.push((cell.node_id, cell.layers));
1267        }
1268        current = next_leaf;
1269    }
1270    Ok(nodes)
1271}
1272
1273/// Pulls `(table_name, column_name, metric)` out of a CREATE INDEX
1274/// SQL string of the form `CREATE INDEX … USING hnsw (col) [WITH
1275/// (metric = '<m>')]`. Used by the persistence path on open to know
1276/// where to attach the loaded graph and which distance metric to
1277/// rebuild it under. Pre-SQLR-28 rows omit the WITH clause and
1278/// default to L2.
1279fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String, DistanceMetric)> {
1280    use crate::sql::hnsw::DistanceMetric;
1281    use sqlparser::ast::{BinaryOperator, CreateIndex, Expr, Statement, Value as AstValue};
1282
1283    let dialect = SqlriteDialect::new();
1284    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1285    let Some(Statement::CreateIndex(CreateIndex {
1286        table_name,
1287        columns,
1288        with,
1289        ..
1290    })) = ast.pop()
1291    else {
1292        return Err(SQLRiteError::Internal(format!(
1293            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1294        )));
1295    };
1296    if columns.len() != 1 {
1297        return Err(SQLRiteError::NotImplemented(
1298            "multi-column HNSW indexes aren't supported yet".to_string(),
1299        ));
1300    }
1301    let col = match &columns[0].column.expr {
1302        Expr::Identifier(ident) => ident.value.clone(),
1303        Expr::CompoundIdentifier(parts) => {
1304            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1305        }
1306        other => {
1307            return Err(SQLRiteError::Internal(format!(
1308                "unsupported HNSW indexed column expression: {other:?}"
1309            )));
1310        }
1311    };
1312
1313    // Pull the metric off the parsed WITH (...) bag. The user-facing
1314    // CREATE INDEX path validates this in detail (typo'd metric names,
1315    // unknown keys, etc.); here on the persistence read-path we trust
1316    // what we previously wrote and surface a clean Internal error if
1317    // it ever doesn't match.
1318    let mut metric = DistanceMetric::L2;
1319    for opt in &with {
1320        if let Expr::BinaryOp { left, op, right } = opt {
1321            if matches!(op, BinaryOperator::Eq) {
1322                if let (Expr::Identifier(key), Expr::Value(v)) = (left.as_ref(), right.as_ref())
1323                    && key.value.eq_ignore_ascii_case("metric")
1324                {
1325                    if let AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) =
1326                        &v.value
1327                    {
1328                        metric = DistanceMetric::from_sql_name(s).ok_or_else(|| {
1329                            SQLRiteError::Internal(format!(
1330                                "sqlrite_master HNSW row carries unknown metric '{s}'"
1331                            ))
1332                        })?;
1333                    }
1334                }
1335            }
1336        }
1337    }
1338
1339    Ok((table_name.to_string(), col, metric))
1340}
1341
1342/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1343/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1344/// Walks the table's current Vec<f32> column storage and runs the
1345/// HNSW algorithm fresh. Called at the top of `save_database` before
1346/// any immutable borrows of `db` start.
1347///
1348/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1349/// small tables, expensive for ≥100k-row tables — matches the
1350/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1351/// MVP, more sophisticated incremental delete strategies (soft-delete
1352/// + tombstones, neighbor reconnection) are future polish.
1353fn rebuild_dirty_hnsw_indexes(db: &mut Database) -> Result<()> {
1354    for table in db.tables.values_mut() {
1355        table.rebuild_dirty_hnsw_indexes()?;
1356    }
1357    Ok(())
1358}
1359
1360/// Synthesises the CREATE INDEX SQL stored back into `sqlrite_master`
1361/// for an HNSW index. The metric travels through the SQL via an
1362/// optional `WITH (metric = '<m>')` clause; L2 indexes omit the clause
1363/// for byte-identical round-trip with pre-SQLR-28 catalogs.
1364fn synthesize_hnsw_create_index_sql(
1365    index_name: &str,
1366    table_name: &str,
1367    column_name: &str,
1368    metric: DistanceMetric,
1369) -> String {
1370    if matches!(metric, DistanceMetric::L2) {
1371        format!("CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name})")
1372    } else {
1373        format!(
1374            "CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name}) WITH (metric = '{}')",
1375            metric.sql_name()
1376        )
1377    }
1378}
1379
1380/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1381/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1382/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1383/// is empty so the per-table loop short-circuits).
1384fn rebuild_dirty_fts_indexes(db: &mut Database) {
1385    use crate::sql::fts::PostingList;
1386
1387    for table in db.tables.values_mut() {
1388        let dirty: Vec<(String, String)> = table
1389            .fts_indexes
1390            .iter()
1391            .filter(|e| e.needs_rebuild)
1392            .map(|e| (e.name.clone(), e.column_name.clone()))
1393            .collect();
1394        if dirty.is_empty() {
1395            continue;
1396        }
1397
1398        for (idx_name, col_name) in dirty {
1399            // Snapshot every (rowid, text) pair for this column under
1400            // the row mutex, then drop the lock before re-tokenizing.
1401            let mut docs: Vec<(i64, String)> = Vec::new();
1402            {
1403                let row_data = table.rows.lock().expect("rows mutex poisoned");
1404                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1405                    for (id, v) in map.iter() {
1406                        // "Null" sentinel is the parser's
1407                        // null-marker for TEXT cells; skip those —
1408                        // they'd round-trip as the literal string
1409                        // "Null" otherwise. Aligns with insert_row's
1410                        // typed_value gate.
1411                        if v != "Null" {
1412                            docs.push((*id, v.clone()));
1413                        }
1414                    }
1415                }
1416            }
1417
1418            let mut new_idx = PostingList::new();
1419            // Sort by id so the rebuild is deterministic across runs
1420            // (the BTreeMap inside PostingList is order-stable, but
1421            // doc-length aggregation order doesn't matter — sorting
1422            // here is purely for reproducibility on inspection).
1423            docs.sort_by_key(|(id, _)| *id);
1424            for (id, text) in &docs {
1425                new_idx.insert(*id, text);
1426            }
1427
1428            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1429                entry.index = new_idx;
1430                entry.needs_rebuild = false;
1431            }
1432        }
1433    }
1434}
1435
1436/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1437fn clone_datatype(dt: &DataType) -> DataType {
1438    match dt {
1439        DataType::Integer => DataType::Integer,
1440        DataType::Text => DataType::Text,
1441        DataType::Real => DataType::Real,
1442        DataType::Bool => DataType::Bool,
1443        DataType::Vector(dim) => DataType::Vector(*dim),
1444        DataType::Json => DataType::Json,
1445        DataType::None => DataType::None,
1446        DataType::Invalid => DataType::Invalid,
1447    }
1448}
1449
1450/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1451/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1452/// `(root_page, next_free_page)`.
1453///
1454/// The tree's shape matches a regular table's — leaves chained via
1455/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1456/// uniformly for index cells (same prefix as local cells), so the
1457/// existing slot directory and binary search carry over.
1458fn stage_index_btree(
1459    pager: &mut Pager,
1460    idx: &SecondaryIndex,
1461    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1462) -> Result<u32> {
1463    // Build the leaves.
1464    let leaves = stage_index_leaves(pager, idx, alloc)?;
1465    if leaves.len() == 1 {
1466        return Ok(leaves[0].0);
1467    }
1468    let mut level: Vec<(u32, i64)> = leaves;
1469    while level.len() > 1 {
1470        level = stage_interior_level(pager, &level, alloc)?;
1471    }
1472    Ok(level[0].0)
1473}
1474
1475/// Packs the index's (value, rowid) entries into a sibling-chained run
1476/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1477/// (ascending value; rowids in insertion order within a value), which is
1478/// also ascending by the "cell rowid" carried in each IndexCell (the
1479/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1480/// rowid ordering stays consistent.
1481fn stage_index_leaves(
1482    pager: &mut Pager,
1483    idx: &SecondaryIndex,
1484    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1485) -> Result<Vec<(u32, i64)>> {
1486    let mut leaves: Vec<(u32, i64)> = Vec::new();
1487    let mut current_leaf = TablePage::empty();
1488    let mut current_leaf_page = alloc.allocate();
1489    let mut current_max_rowid: Option<i64> = None;
1490
1491    // Sort the entries by original rowid so the in-page slot directory,
1492    // which binary-searches by rowid, stays valid. (iter_entries orders by
1493    // value; we reorder here for B-Tree correctness.)
1494    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1495    entries.sort_by_key(|(_, r)| *r);
1496
1497    for (value, rowid) in entries {
1498        let cell = IndexCell::new(rowid, value);
1499        let entry_bytes = cell.encode()?;
1500
1501        if !current_leaf.would_fit(entry_bytes.len()) {
1502            let next_leaf_page_num = alloc.allocate();
1503            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1504            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1505            current_leaf = TablePage::empty();
1506            current_leaf_page = next_leaf_page_num;
1507
1508            if !current_leaf.would_fit(entry_bytes.len()) {
1509                return Err(SQLRiteError::Internal(format!(
1510                    "index entry of {} bytes exceeds empty-page capacity {}",
1511                    entry_bytes.len(),
1512                    current_leaf.free_space()
1513                )));
1514            }
1515        }
1516        current_leaf.insert_entry(rowid, &entry_bytes)?;
1517        current_max_rowid = Some(rowid);
1518    }
1519
1520    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1521    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1522    Ok(leaves)
1523}
1524
1525/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1526/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1527/// (node_id, layers). Returns `(root_page, next_free_page)`.
1528///
1529/// Tree shape is identical to `stage_index_btree` — chained leaves +
1530/// optional interior layers. The slot directory binary-searches by
1531/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1532/// so reads can locate any node in O(log N) once 7d.4-or-later
1533/// optimizes the load path to lazy-fetch instead of read-all.
1534/// Today, `load_hnsw_nodes` reads the entire tree on open.
1535fn stage_hnsw_btree(
1536    pager: &mut Pager,
1537    idx: &crate::sql::hnsw::HnswIndex,
1538    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1539) -> Result<u32> {
1540    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1541    if leaves.len() == 1 {
1542        return Ok(leaves[0].0);
1543    }
1544    let mut level: Vec<(u32, i64)> = leaves;
1545    while level.len() > 1 {
1546        level = stage_interior_level(pager, &level, alloc)?;
1547    }
1548    Ok(level[0].0)
1549}
1550
1551/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1552/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1553/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1554/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1555/// doc-lengths map, then one cell per term in lexicographic order.
1556fn stage_fts_btree(
1557    pager: &mut Pager,
1558    idx: &crate::sql::fts::PostingList,
1559    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1560) -> Result<u32> {
1561    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1562    if leaves.len() == 1 {
1563        return Ok(leaves[0].0);
1564    }
1565    let mut level: Vec<(u32, i64)> = leaves;
1566    while level.len() > 1 {
1567        level = stage_interior_level(pager, &level, alloc)?;
1568    }
1569    Ok(level[0].0)
1570}
1571
1572/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1573/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1574/// followed by one cell per term in lexicographic order with
1575/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1576/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1577/// returns).
1578fn stage_fts_leaves(
1579    pager: &mut Pager,
1580    idx: &crate::sql::fts::PostingList,
1581    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1582) -> Result<Vec<(u32, i64)>> {
1583    use crate::sql::pager::fts_cell::FtsPostingCell;
1584
1585    let mut leaves: Vec<(u32, i64)> = Vec::new();
1586    let mut current_leaf = TablePage::empty();
1587    let mut current_leaf_page = alloc.allocate();
1588    let mut current_max_rowid: Option<i64> = None;
1589
1590    // Build the cell sequence: sidecar first, then per-term cells. The
1591    // sidecar always exists (even on an empty index) so reload sees a
1592    // canonical "this index was persisted" marker in slot 0.
1593    let mut cell_id: i64 = 1;
1594    let mut cells: Vec<FtsPostingCell> = Vec::new();
1595    cells.push(FtsPostingCell::doc_lengths(
1596        cell_id,
1597        idx.serialize_doc_lengths(),
1598    ));
1599    for (term, entries) in idx.serialize_postings() {
1600        cell_id += 1;
1601        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1602    }
1603
1604    for cell in cells {
1605        let entry_bytes = cell.encode()?;
1606
1607        if !current_leaf.would_fit(entry_bytes.len()) {
1608            let next_leaf_page_num = alloc.allocate();
1609            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1610            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1611            current_leaf = TablePage::empty();
1612            current_leaf_page = next_leaf_page_num;
1613
1614            if !current_leaf.would_fit(entry_bytes.len()) {
1615                // A single posting cell exceeds page capacity. Phase
1616                // 8c MVP doesn't chain via overflow cells (the plan
1617                // notes this as a stretch goal); surface a clear
1618                // error so users know which term tripped it.
1619                return Err(SQLRiteError::Internal(format!(
1620                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1621                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1622                    cell.cell_id,
1623                    entry_bytes.len(),
1624                    current_leaf.free_space()
1625                )));
1626            }
1627        }
1628        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1629        current_max_rowid = Some(cell.cell_id);
1630    }
1631
1632    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1633    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1634    Ok(leaves)
1635}
1636
1637/// (rowid, value) pairs as decoded from a single FTS cell — value is
1638/// either term frequency (posting cell) or doc length (sidecar cell).
1639type FtsEntries = Vec<(i64, u32)>;
1640/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1641type FtsPostings = Vec<(String, FtsEntries)>;
1642
1643/// Phase 8c — read every cell of an FTS index from `root_page` back
1644/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1645/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1646/// sibling chain, decode each slot.
1647fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1648    use crate::sql::pager::fts_cell::FtsPostingCell;
1649
1650    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1651    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1652    let mut saw_sidecar = false;
1653
1654    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1655    let mut current = first_leaf;
1656    while current != 0 {
1657        let page_buf = pager
1658            .read_page(current)
1659            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1660        if page_buf[0] != PageType::TableLeaf as u8 {
1661            return Err(SQLRiteError::Internal(format!(
1662                "page {current} tagged {} but expected TableLeaf (FTS)",
1663                page_buf[0]
1664            )));
1665        }
1666        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1667        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1668            .try_into()
1669            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1670        let leaf = TablePage::from_bytes(payload);
1671        for slot in 0..leaf.slot_count() {
1672            let offset = leaf.slot_offset_raw(slot)?;
1673            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1674            if cell.is_doc_lengths() {
1675                if saw_sidecar {
1676                    return Err(SQLRiteError::Internal(
1677                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1678                    ));
1679                }
1680                saw_sidecar = true;
1681                doc_lengths = cell.entries;
1682            } else {
1683                postings.push((cell.term, cell.entries));
1684            }
1685        }
1686        current = next_leaf;
1687    }
1688
1689    if !saw_sidecar {
1690        return Err(SQLRiteError::Internal(
1691            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1692        ));
1693    }
1694    Ok((doc_lengths, postings))
1695}
1696
1697/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1698/// `serialize_nodes` already returns nodes in ascending node_id order,
1699/// so the slot directory's rowid ordering stays valid.
1700fn stage_hnsw_leaves(
1701    pager: &mut Pager,
1702    idx: &crate::sql::hnsw::HnswIndex,
1703    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1704) -> Result<Vec<(u32, i64)>> {
1705    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1706
1707    let mut leaves: Vec<(u32, i64)> = Vec::new();
1708    let mut current_leaf = TablePage::empty();
1709    let mut current_leaf_page = alloc.allocate();
1710    let mut current_max_rowid: Option<i64> = None;
1711
1712    let serialized = idx.serialize_nodes();
1713
1714    // Empty index → emit a single empty leaf page so the rootpage
1715    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1716    // it just happens to be empty"). load_hnsw_nodes is fine with an
1717    // empty leaf — slot_count() returns 0.
1718    for (node_id, layers) in serialized {
1719        let cell = HnswNodeCell::new(node_id, layers);
1720        let entry_bytes = cell.encode()?;
1721
1722        if !current_leaf.would_fit(entry_bytes.len()) {
1723            let next_leaf_page_num = alloc.allocate();
1724            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1725            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1726            current_leaf = TablePage::empty();
1727            current_leaf_page = next_leaf_page_num;
1728
1729            if !current_leaf.would_fit(entry_bytes.len()) {
1730                return Err(SQLRiteError::Internal(format!(
1731                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1732                    entry_bytes.len(),
1733                    current_leaf.free_space()
1734                )));
1735            }
1736        }
1737        current_leaf.insert_entry(node_id, &entry_bytes)?;
1738        current_max_rowid = Some(node_id);
1739    }
1740
1741    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1742    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1743    Ok(leaves)
1744}
1745
1746fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1747    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1748    let mut current = first_leaf;
1749    while current != 0 {
1750        let page_buf = pager
1751            .read_page(current)
1752            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1753        if page_buf[0] != PageType::TableLeaf as u8 {
1754            return Err(SQLRiteError::Internal(format!(
1755                "page {current} tagged {} but expected TableLeaf",
1756                page_buf[0]
1757            )));
1758        }
1759        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1760        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1761            .try_into()
1762            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1763        let leaf = TablePage::from_bytes(payload);
1764
1765        for slot in 0..leaf.slot_count() {
1766            let entry = leaf.entry_at(slot)?;
1767            let cell = match entry {
1768                PagedEntry::Local(c) => c,
1769                PagedEntry::Overflow(r) => {
1770                    let body_bytes =
1771                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1772                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1773                    c
1774                }
1775            };
1776            table.restore_row(cell.rowid, cell.values)?;
1777        }
1778        current = next_leaf;
1779    }
1780    Ok(())
1781}
1782
1783/// Walks every page reachable from `root_page` and returns their page
1784/// numbers. Includes `root_page`, every interior page, every leaf, and
1785/// — when `follow_overflow` is true — every overflow page chained off
1786/// table-leaf cells. Used by `save_database` to seed each table's
1787/// per-table preferred pool and to compute the newly-freed set.
1788///
1789/// `follow_overflow = true` for table B-Trees (cells may carry
1790/// `OverflowRef`s pointing at chained overflow pages); `false` for
1791/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1792/// current encoding.
1793fn collect_pages_for_btree(
1794    pager: &Pager,
1795    root_page: u32,
1796    follow_overflow: bool,
1797) -> Result<Vec<u32>> {
1798    if root_page == 0 {
1799        return Ok(Vec::new());
1800    }
1801    let mut pages: Vec<u32> = Vec::new();
1802    let mut stack: Vec<u32> = vec![root_page];
1803
1804    while let Some(p) = stack.pop() {
1805        let buf = pager.read_page(p).ok_or_else(|| {
1806            SQLRiteError::Internal(format!(
1807                "collect_pages: missing page {p} (rooted at {root_page})"
1808            ))
1809        })?;
1810        pages.push(p);
1811        match buf[0] {
1812            t if t == PageType::InteriorNode as u8 => {
1813                let payload: &[u8; PAYLOAD_PER_PAGE] =
1814                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1815                        SQLRiteError::Internal("interior payload slice size".to_string())
1816                    })?;
1817                let interior = InteriorPage::from_bytes(payload);
1818                // Push every divider's child + the rightmost child.
1819                for slot in 0..interior.slot_count() {
1820                    let cell = interior.cell_at(slot)?;
1821                    stack.push(cell.child_page);
1822                }
1823                stack.push(interior.rightmost_child());
1824            }
1825            t if t == PageType::TableLeaf as u8 => {
1826                if follow_overflow {
1827                    let payload: &[u8; PAYLOAD_PER_PAGE] =
1828                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1829                            SQLRiteError::Internal("leaf payload slice size".to_string())
1830                        })?;
1831                    let leaf = TablePage::from_bytes(payload);
1832                    for slot in 0..leaf.slot_count() {
1833                        match leaf.entry_at(slot)? {
1834                            PagedEntry::Local(_) => {}
1835                            PagedEntry::Overflow(r) => {
1836                                let mut cur = r.first_overflow_page;
1837                                while cur != 0 {
1838                                    pages.push(cur);
1839                                    let ob = pager.read_page(cur).ok_or_else(|| {
1840                                        SQLRiteError::Internal(format!(
1841                                            "collect_pages: missing overflow page {cur}"
1842                                        ))
1843                                    })?;
1844                                    if ob[0] != PageType::Overflow as u8 {
1845                                        return Err(SQLRiteError::Internal(format!(
1846                                            "collect_pages: page {cur} expected Overflow, got tag {}",
1847                                            ob[0]
1848                                        )));
1849                                    }
1850                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
1851                                }
1852                            }
1853                        }
1854                    }
1855                }
1856            }
1857            other => {
1858                return Err(SQLRiteError::Internal(format!(
1859                    "collect_pages: unexpected page type {other} at page {p}"
1860                )));
1861            }
1862        }
1863    }
1864    Ok(pages)
1865}
1866
1867/// Reads the previously-persisted `sqlrite_master` and returns a map from
1868/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1869/// seed each table/index's per-table preferred pool with the pages it
1870/// occupied last time round.
1871///
1872/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1873/// the three index families via the SQL string, but for page-collection
1874/// purposes a "table" tree must follow overflow refs while an "index"
1875/// tree never does — that's the only distinction we need here).
1876fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
1877    let mut out: HashMap<(String, String), u32> = HashMap::new();
1878    if schema_root == 0 {
1879        return Ok(out);
1880    }
1881    let mut master = build_empty_master_table();
1882    load_table_rows(pager, &mut master, schema_root)?;
1883    for rowid in master.rowids() {
1884        let kind = take_text(&master, "type", rowid)?;
1885        let name = take_text(&master, "name", rowid)?;
1886        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
1887        out.insert((kind, name), rootpage);
1888    }
1889    Ok(out)
1890}
1891
1892/// Descends from `root_page` through `InteriorNode` pages, always taking
1893/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1894/// page number. A root that's already a leaf is returned as-is.
1895fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1896    let mut current = root_page;
1897    loop {
1898        let page_buf = pager.read_page(current).ok_or_else(|| {
1899            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1900        })?;
1901        match page_buf[0] {
1902            t if t == PageType::TableLeaf as u8 => return Ok(current),
1903            t if t == PageType::InteriorNode as u8 => {
1904                let payload: &[u8; PAYLOAD_PER_PAGE] =
1905                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1906                        SQLRiteError::Internal("interior payload slice size".to_string())
1907                    })?;
1908                let interior = InteriorPage::from_bytes(payload);
1909                current = interior.leftmost_child()?;
1910            }
1911            other => {
1912                return Err(SQLRiteError::Internal(format!(
1913                    "unexpected page type {other} during tree descent at page {current}"
1914                )));
1915            }
1916        }
1917    }
1918}
1919
1920/// Stages a table's B-Tree, drawing every page number from `alloc`.
1921/// Returns the root page (the topmost interior page, or the single leaf
1922/// when the table fits in one page).
1923///
1924/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1925/// `next_page`, then if more than one leaf, recursively wrap them in
1926/// `InteriorNode` levels until one root remains.
1927///
1928/// Deterministic: same rows + same allocator handouts → byte-identical
1929/// pages at the same numbers, so the diff pager skips unchanged tables.
1930fn stage_table_btree(
1931    pager: &mut Pager,
1932    table: &Table,
1933    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1934) -> Result<u32> {
1935    let leaves = stage_leaves(pager, table, alloc)?;
1936    if leaves.len() == 1 {
1937        return Ok(leaves[0].0);
1938    }
1939    let mut level: Vec<(u32, i64)> = leaves;
1940    while level.len() > 1 {
1941        level = stage_interior_level(pager, &level, alloc)?;
1942    }
1943    Ok(level[0].0)
1944}
1945
1946/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1947/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1948/// interior level. Allocates leaf and overflow pages from `alloc`.
1949fn stage_leaves(
1950    pager: &mut Pager,
1951    table: &Table,
1952    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1953) -> Result<Vec<(u32, i64)>> {
1954    let mut leaves: Vec<(u32, i64)> = Vec::new();
1955    let mut current_leaf = TablePage::empty();
1956    let mut current_leaf_page = alloc.allocate();
1957    let mut current_max_rowid: Option<i64> = None;
1958
1959    for rowid in table.rowids() {
1960        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
1961
1962        if !current_leaf.would_fit(entry_bytes.len()) {
1963            // The new leaf goes at whatever the allocator hands out
1964            // next. Commit the current leaf with that as its sibling
1965            // pointer.
1966            let next_leaf_page_num = alloc.allocate();
1967            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1968            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1969            current_leaf = TablePage::empty();
1970            current_leaf_page = next_leaf_page_num;
1971            // current_max_rowid is reassigned by the insert below; no need
1972            // to zero it out here.
1973
1974            if !current_leaf.would_fit(entry_bytes.len()) {
1975                return Err(SQLRiteError::Internal(format!(
1976                    "entry of {} bytes exceeds empty-page capacity {}",
1977                    entry_bytes.len(),
1978                    current_leaf.free_space()
1979                )));
1980            }
1981        }
1982        current_leaf.insert_entry(rowid, &entry_bytes)?;
1983        current_max_rowid = Some(rowid);
1984    }
1985
1986    // Final leaf: sibling next_page = 0 (end of chain).
1987    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1988    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1989    Ok(leaves)
1990}
1991
1992/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1993/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1994/// encoded cell exceeded the inline threshold. Allocates any overflow
1995/// pages from `alloc`.
1996fn build_row_entry(
1997    pager: &mut Pager,
1998    table: &Table,
1999    rowid: i64,
2000    alloc: &mut crate::sql::pager::allocator::PageAllocator,
2001) -> Result<Vec<u8>> {
2002    let values = table.extract_row(rowid);
2003    let local_cell = Cell::new(rowid, values);
2004    let local_bytes = local_cell.encode()?;
2005    if local_bytes.len() > OVERFLOW_THRESHOLD {
2006        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
2007        Ok(OverflowRef {
2008            rowid,
2009            total_body_len: local_bytes.len() as u64,
2010            first_overflow_page: overflow_start,
2011        }
2012        .encode())
2013    } else {
2014        Ok(local_bytes)
2015    }
2016}
2017
2018/// Builds one level of `InteriorNode` pages above the given children.
2019/// Each interior packs as many dividers as will fit; the last child
2020/// assigned to an interior becomes its `rightmost_child`. Returns the
2021/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
2022fn stage_interior_level(
2023    pager: &mut Pager,
2024    children: &[(u32, i64)],
2025    alloc: &mut crate::sql::pager::allocator::PageAllocator,
2026) -> Result<Vec<(u32, i64)>> {
2027    let mut next_level: Vec<(u32, i64)> = Vec::new();
2028    let mut idx = 0usize;
2029
2030    while idx < children.len() {
2031        let interior_page_num = alloc.allocate();
2032
2033        // Seed the interior with the first unassigned child as its
2034        // rightmost. As we add more children, the previous rightmost
2035        // graduates to being a divider and the new arrival takes over
2036        // as rightmost.
2037        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
2038        idx += 1;
2039        let mut interior = InteriorPage::empty(rightmost_child_page);
2040
2041        while idx < children.len() {
2042            let new_divider_cell = InteriorCell {
2043                divider_rowid: rightmost_child_max,
2044                child_page: rightmost_child_page,
2045            };
2046            let new_divider_bytes = new_divider_cell.encode();
2047            if !interior.would_fit(new_divider_bytes.len()) {
2048                break;
2049            }
2050            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
2051            let (next_child_page, next_child_max) = children[idx];
2052            interior.set_rightmost_child(next_child_page);
2053            rightmost_child_page = next_child_page;
2054            rightmost_child_max = next_child_max;
2055            idx += 1;
2056        }
2057
2058        emit_interior(pager, interior_page_num, &interior);
2059        next_level.push((interior_page_num, rightmost_child_max));
2060    }
2061
2062    Ok(next_level)
2063}
2064
2065/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
2066fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
2067    let mut buf = [0u8; PAGE_SIZE];
2068    buf[0] = PageType::TableLeaf as u8;
2069    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
2070    // For leaf pages the legacy `payload_len` field isn't used — the slot
2071    // directory self-describes. Zero it by convention.
2072    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
2073    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
2074    pager.stage_page(page_num, buf);
2075}
2076
2077/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
2078/// don't use `next_page` (there's no sibling chain between interiors);
2079/// `payload_len` is also unused (the slot directory self-describes).
2080fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
2081    let mut buf = [0u8; PAGE_SIZE];
2082    buf[0] = PageType::InteriorNode as u8;
2083    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
2084    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
2085    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
2086    pager.stage_page(page_num, buf);
2087}
2088
2089#[cfg(test)]
2090mod tests {
2091    use super::*;
2092    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
2093    use crate::sql::process_command;
2094
2095    fn seed_db() -> Database {
2096        let mut db = Database::new("test".to_string());
2097        process_command(
2098            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
2099            &mut db,
2100        )
2101        .unwrap();
2102        process_command(
2103            "INSERT INTO users (name, age) VALUES ('alice', 30);",
2104            &mut db,
2105        )
2106        .unwrap();
2107        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
2108        process_command(
2109            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
2110            &mut db,
2111        )
2112        .unwrap();
2113        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
2114        db
2115    }
2116
2117    fn tmp_path(name: &str) -> std::path::PathBuf {
2118        let mut p = std::env::temp_dir();
2119        let pid = std::process::id();
2120        let nanos = std::time::SystemTime::now()
2121            .duration_since(std::time::UNIX_EPOCH)
2122            .map(|d| d.as_nanos())
2123            .unwrap_or(0);
2124        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
2125        p
2126    }
2127
2128    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
2129    /// `/tmp` doesn't accumulate orphan WALs across test runs.
2130    fn cleanup(path: &std::path::Path) {
2131        let _ = std::fs::remove_file(path);
2132        let mut wal = path.as_os_str().to_owned();
2133        wal.push("-wal");
2134        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
2135    }
2136
2137    #[test]
2138    fn round_trip_preserves_schema_and_data() {
2139        let path = tmp_path("roundtrip");
2140        let mut db = seed_db();
2141        save_database(&mut db, &path).expect("save");
2142
2143        let loaded = open_database(&path, "test".to_string()).expect("open");
2144        assert_eq!(loaded.tables.len(), 2);
2145
2146        let users = loaded.get_table("users".to_string()).expect("users table");
2147        assert_eq!(users.columns.len(), 3);
2148        let rowids = users.rowids();
2149        assert_eq!(rowids.len(), 2);
2150        let names: Vec<String> = rowids
2151            .iter()
2152            .filter_map(|r| match users.get_value("name", *r) {
2153                Some(Value::Text(s)) => Some(s),
2154                _ => None,
2155            })
2156            .collect();
2157        assert!(names.contains(&"alice".to_string()));
2158        assert!(names.contains(&"bob".to_string()));
2159
2160        let notes = loaded.get_table("notes".to_string()).expect("notes table");
2161        assert_eq!(notes.rowids().len(), 1);
2162
2163        cleanup(&path);
2164    }
2165
2166    // -----------------------------------------------------------------
2167    // Phase 7a — VECTOR(N) save / reopen round-trip
2168    // -----------------------------------------------------------------
2169
2170    #[test]
2171    fn round_trip_preserves_vector_column() {
2172        let path = tmp_path("vec_roundtrip");
2173
2174        // Build, populate, save.
2175        {
2176            let mut db = Database::new("test".to_string());
2177            process_command(
2178                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
2179                &mut db,
2180            )
2181            .unwrap();
2182            process_command(
2183                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
2184                &mut db,
2185            )
2186            .unwrap();
2187            process_command(
2188                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
2189                &mut db,
2190            )
2191            .unwrap();
2192            save_database(&mut db, &path).expect("save");
2193        } // db drops → its exclusive lock releases before reopen.
2194
2195        // Reopen and verify schema + data both round-tripped.
2196        let loaded = open_database(&path, "test".to_string()).expect("open");
2197        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2198
2199        // Schema preserved: column is still VECTOR(3).
2200        let embedding_col = docs
2201            .columns
2202            .iter()
2203            .find(|c| c.column_name == "embedding")
2204            .expect("embedding column");
2205        assert!(
2206            matches!(embedding_col.datatype, DataType::Vector(3)),
2207            "expected DataType::Vector(3) after round-trip, got {:?}",
2208            embedding_col.datatype
2209        );
2210
2211        // Data preserved: both vectors still readable bit-for-bit.
2212        let mut rows: Vec<Vec<f32>> = docs
2213            .rowids()
2214            .iter()
2215            .filter_map(|r| match docs.get_value("embedding", *r) {
2216                Some(Value::Vector(v)) => Some(v),
2217                _ => None,
2218            })
2219            .collect();
2220        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
2221        assert_eq!(rows.len(), 2);
2222        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
2223        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
2224
2225        cleanup(&path);
2226    }
2227
2228    #[test]
2229    fn round_trip_preserves_json_column() {
2230        // Phase 7e — JSON columns are stored as Text under the hood with
2231        // INSERT-time validation. Save + reopen should preserve the
2232        // schema (DataType::Json) and the underlying text bytes; a
2233        // post-reopen json_extract should still resolve paths correctly.
2234        let path = tmp_path("json_roundtrip");
2235
2236        {
2237            let mut db = Database::new("test".to_string());
2238            process_command(
2239                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2240                &mut db,
2241            )
2242            .unwrap();
2243            process_command(
2244                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2245                &mut db,
2246            )
2247            .unwrap();
2248            save_database(&mut db, &path).expect("save");
2249        }
2250
2251        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2252        let docs = loaded.get_table("docs".to_string()).expect("docs");
2253
2254        // Schema: column declared as JSON, restored with the same type.
2255        let payload_col = docs
2256            .columns
2257            .iter()
2258            .find(|c| c.column_name == "payload")
2259            .unwrap();
2260        assert!(
2261            matches!(payload_col.datatype, DataType::Json),
2262            "expected DataType::Json, got {:?}",
2263            payload_col.datatype
2264        );
2265
2266        // json_extract works against the reopened data — exercises the
2267        // full Text-storage + serde_json::from_str path post-reopen.
2268        let resp = process_command(
2269            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2270            &mut loaded,
2271        )
2272        .expect("select via json_extract after reopen");
2273        assert!(resp.contains("1 row returned"), "got: {resp}");
2274
2275        cleanup(&path);
2276    }
2277
2278    #[test]
2279    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
2280        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2281        // pages. After save+reopen the index entry reattaches with the
2282        // same column + same node count, loaded directly from disk
2283        // instead of re-walking rows.
2284        let path = tmp_path("hnsw_roundtrip");
2285
2286        // Build, populate, index, save.
2287        {
2288            let mut db = Database::new("test".to_string());
2289            process_command(
2290                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2291                &mut db,
2292            )
2293            .unwrap();
2294            for v in &[
2295                "[1.0, 0.0]",
2296                "[2.0, 0.0]",
2297                "[0.0, 3.0]",
2298                "[1.0, 4.0]",
2299                "[10.0, 10.0]",
2300            ] {
2301                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2302            }
2303            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2304            save_database(&mut db, &path).expect("save");
2305        } // db drops → exclusive lock releases.
2306
2307        // Reopen and verify the index reattached, with the same name +
2308        // column + populated graph.
2309        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2310        {
2311            let table = loaded.get_table("docs".to_string()).expect("docs");
2312            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
2313            let entry = &table.hnsw_indexes[0];
2314            assert_eq!(entry.name, "ix_e");
2315            assert_eq!(entry.column_name, "e");
2316            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
2317            assert!(
2318                !entry.needs_rebuild,
2319                "fresh load should not be marked dirty"
2320            );
2321        }
2322
2323        // Quick functional check: KNN query through the loaded index
2324        // returns results.
2325        let resp = process_command(
2326            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2327            &mut loaded,
2328        )
2329        .unwrap();
2330        assert!(resp.contains("3 rows returned"), "got: {resp}");
2331
2332        cleanup(&path);
2333    }
2334
2335    /// SQLR-28 — the HNSW metric must round-trip across save+reopen.
2336    /// Without this, the SQL re-synthesised into `sqlrite_master`
2337    /// would drop the metric and a cosine-built graph would reload
2338    /// as L2, silently breaking subsequent cosine probes.
2339    #[test]
2340    fn round_trip_preserves_hnsw_cosine_metric() {
2341        use crate::sql::hnsw::DistanceMetric;
2342        let path = tmp_path("hnsw_metric_roundtrip");
2343
2344        {
2345            let mut db = Database::new("test".to_string());
2346            process_command(
2347                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2348                &mut db,
2349            )
2350            .unwrap();
2351            for v in &["[1.0, 0.0]", "[0.0, 1.0]", "[0.7071, 0.7071]"] {
2352                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2353            }
2354            process_command(
2355                "CREATE INDEX ix_cos ON docs USING hnsw (e) WITH (metric = 'cosine');",
2356                &mut db,
2357            )
2358            .unwrap();
2359            save_database(&mut db, &path).expect("save");
2360        }
2361
2362        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2363        {
2364            let table = loaded.get_table("docs".to_string()).expect("docs");
2365            assert_eq!(table.hnsw_indexes.len(), 1);
2366            assert_eq!(
2367                table.hnsw_indexes[0].metric,
2368                DistanceMetric::Cosine,
2369                "metric should round-trip through CREATE INDEX SQL"
2370            );
2371            assert_eq!(table.hnsw_indexes[0].index.distance, DistanceMetric::Cosine);
2372        }
2373
2374        // Cosine probe still finds the self-vector after reopen — the
2375        // optimizer's metric gate should match the loaded entry's
2376        // metric, so this should hit the graph shortcut.
2377        let resp = process_command(
2378            "SELECT id FROM docs ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1;",
2379            &mut loaded,
2380        )
2381        .unwrap();
2382        assert!(resp.contains("1 row returned"), "got: {resp}");
2383
2384        cleanup(&path);
2385    }
2386
2387    #[test]
2388    fn round_trip_rebuilds_fts_index_from_create_sql() {
2389        // Phase 8c: FTS indexes now persist their posting lists as
2390        // cell-encoded pages. After save+reopen the index entry
2391        // reattaches with the same column + same posting count, loaded
2392        // directly from disk (no re-tokenization).
2393        let path = tmp_path("fts_roundtrip");
2394
2395        {
2396            let mut db = Database::new("test".to_string());
2397            process_command(
2398                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2399                &mut db,
2400            )
2401            .unwrap();
2402            for body in &[
2403                "rust embedded database",
2404                "rust web framework",
2405                "go embedded systems",
2406                "python web framework",
2407                "rust rust embedded power",
2408            ] {
2409                process_command(
2410                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2411                    &mut db,
2412                )
2413                .unwrap();
2414            }
2415            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2416            save_database(&mut db, &path).expect("save");
2417        } // db drops → exclusive lock releases.
2418
2419        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2420        {
2421            let table = loaded.get_table("docs".to_string()).expect("docs");
2422            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
2423            let entry = &table.fts_indexes[0];
2424            assert_eq!(entry.name, "ix_body");
2425            assert_eq!(entry.column_name, "body");
2426            assert_eq!(
2427                entry.index.len(),
2428                5,
2429                "rebuilt posting list should hold all 5 rows"
2430            );
2431            assert!(!entry.needs_rebuild);
2432        }
2433
2434        // Functional smoke: an FTS query through the reloaded index
2435        // returns the expected hit count.
2436        let resp = process_command(
2437            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2438            &mut loaded,
2439        )
2440        .unwrap();
2441        assert!(resp.contains("3 rows returned"), "got: {resp}");
2442
2443        cleanup(&path);
2444    }
2445
2446    #[test]
2447    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
2448        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2449        // from current rows; reopen replays the CREATE INDEX SQL against
2450        // the post-delete row set. The deleted rowid must not surface
2451        // in `fts_match` results post-reopen.
2452        let path = tmp_path("fts_delete_rebuild");
2453        let mut db = Database::new("test".to_string());
2454        process_command(
2455            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2456            &mut db,
2457        )
2458        .unwrap();
2459        for body in &[
2460            "rust embedded",
2461            "rust framework",
2462            "go embedded",
2463            "python web",
2464        ] {
2465            process_command(
2466                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2467                &mut db,
2468            )
2469            .unwrap();
2470        }
2471        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2472
2473        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2474        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2475        save_database(&mut db, &path).expect("save");
2476        drop(db);
2477
2478        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2479        let resp = process_command(
2480            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2481            &mut loaded,
2482        )
2483        .unwrap();
2484        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2485        // 'rust'. Post-delete: only id=2 remains.
2486        assert!(resp.contains("1 row returned"), "got: {resp}");
2487
2488        cleanup(&path);
2489    }
2490
2491    #[test]
2492    fn fts_roundtrip_uses_persistence_path_not_replay() {
2493        // Phase 8c — assert the reload didn't go through the
2494        // rootpage=0 replay shortcut. We do this by reading the
2495        // sqlrite_master row for the FTS index and confirming its
2496        // rootpage field is non-zero.
2497        let path = tmp_path("fts_persistence_path");
2498
2499        {
2500            let mut db = Database::new("test".to_string());
2501            process_command(
2502                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2503                &mut db,
2504            )
2505            .unwrap();
2506            process_command(
2507                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2508                &mut db,
2509            )
2510            .unwrap();
2511            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2512            save_database(&mut db, &path).expect("save");
2513        }
2514
2515        // Read raw sqlrite_master to find the FTS index row.
2516        let pager = Pager::open(&path).expect("open pager");
2517        let mut master = build_empty_master_table();
2518        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2519        let mut found_rootpage: Option<u32> = None;
2520        for rowid in master.rowids() {
2521            let name = take_text(&master, "name", rowid).unwrap();
2522            if name == "ix_body" {
2523                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2524                found_rootpage = Some(rp as u32);
2525            }
2526        }
2527        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2528        assert!(
2529            rootpage != 0,
2530            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2531        );
2532
2533        cleanup(&path);
2534    }
2535
2536    #[test]
2537    fn save_without_fts_keeps_format_v4() {
2538        // Phase 8c on-demand bump — a database with zero FTS indexes
2539        // continues writing the v4 header. Existing v4 users must not
2540        // see their files silently promoted to v5 by an upgrade.
2541        use crate::sql::pager::header::FORMAT_VERSION_V4;
2542
2543        let path = tmp_path("fts_no_bump");
2544        let mut db = Database::new("test".to_string());
2545        process_command(
2546            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2547            &mut db,
2548        )
2549        .unwrap();
2550        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2551        save_database(&mut db, &path).unwrap();
2552        drop(db);
2553
2554        let pager = Pager::open(&path).expect("open");
2555        assert_eq!(
2556            pager.header().format_version,
2557            FORMAT_VERSION_V4,
2558            "no-FTS save should keep v4"
2559        );
2560        cleanup(&path);
2561    }
2562
2563    #[test]
2564    fn save_with_fts_bumps_to_v5() {
2565        // Phase 8c on-demand bump — first FTS-bearing save promotes
2566        // the file to v5. v5 readers handle both v4 and v5; v4
2567        // readers correctly refuse a v5 file.
2568        use crate::sql::pager::header::FORMAT_VERSION_V5;
2569
2570        let path = tmp_path("fts_bump_v5");
2571        let mut db = Database::new("test".to_string());
2572        process_command(
2573            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2574            &mut db,
2575        )
2576        .unwrap();
2577        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2578        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2579        save_database(&mut db, &path).unwrap();
2580        drop(db);
2581
2582        let pager = Pager::open(&path).expect("open");
2583        assert_eq!(
2584            pager.header().format_version,
2585            FORMAT_VERSION_V5,
2586            "FTS save should promote to v5"
2587        );
2588        cleanup(&path);
2589    }
2590
2591    #[test]
2592    fn fts_persistence_handles_empty_and_zero_token_docs() {
2593        // Phase 8c — sidecar cell carries doc-lengths for every doc
2594        // including any with zero tokens (so total_docs is honest
2595        // post-reopen). Empty index also round-trips: a CREATE INDEX
2596        // on an empty table emits a single empty leaf with just the
2597        // (empty) sidecar.
2598        let path = tmp_path("fts_edges");
2599
2600        {
2601            let mut db = Database::new("test".to_string());
2602            process_command(
2603                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2604                &mut db,
2605            )
2606            .unwrap();
2607            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2608            // Mix: real text, then a row that tokenizes to zero tokens
2609            // (only punctuation), then real again.
2610            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2611            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2612            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2613            save_database(&mut db, &path).unwrap();
2614        }
2615
2616        let loaded = open_database(&path, "test".to_string()).expect("open");
2617        let table = loaded.get_table("docs".to_string()).unwrap();
2618        let entry = &table.fts_indexes[0];
2619        // All three rows present — including the zero-token row,
2620        // which is critical for total_docs honesty in BM25.
2621        assert_eq!(entry.index.len(), 3);
2622        // 'embedded' appears in 2 rows after reload.
2623        let res = entry
2624            .index
2625            .query("embedded", &crate::sql::fts::Bm25Params::default());
2626        assert_eq!(res.len(), 2);
2627
2628        cleanup(&path);
2629    }
2630
2631    #[test]
2632    fn fts_persistence_round_trips_large_corpus() {
2633        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2634        // single-token bodies generates enough cells to overflow a
2635        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2636        let path = tmp_path("fts_large_corpus");
2637
2638        let mut expected_terms: std::collections::BTreeSet<String> =
2639            std::collections::BTreeSet::new();
2640        {
2641            let mut db = Database::new("test".to_string());
2642            process_command(
2643                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2644                &mut db,
2645            )
2646            .unwrap();
2647            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2648            // 500 docs, each one a unique term — drives unique-term
2649            // count up so multiple leaves are required.
2650            for i in 0..500 {
2651                let term = format!("term{i:04}");
2652                process_command(
2653                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2654                    &mut db,
2655                )
2656                .unwrap();
2657                expected_terms.insert(term);
2658            }
2659            save_database(&mut db, &path).unwrap();
2660        }
2661
2662        let loaded = open_database(&path, "test".to_string()).expect("open");
2663        let table = loaded.get_table("docs".to_string()).unwrap();
2664        let entry = &table.fts_indexes[0];
2665        assert_eq!(entry.index.len(), 500);
2666
2667        // Spot-check a handful of terms come back with their original
2668        // single-row posting list.
2669        for &i in &[0_i64, 137, 248, 391, 499] {
2670            let term = format!("term{i:04}");
2671            let res = entry
2672                .index
2673                .query(&term, &crate::sql::fts::Bm25Params::default());
2674            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2675            // PrimaryKey rowids start at 1; doc i was inserted at
2676            // rowid i+1.
2677            assert_eq!(res[0].0, i + 1);
2678        }
2679
2680        cleanup(&path);
2681    }
2682
2683    #[test]
2684    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2685        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2686        // current rows + serializes; reopen loads the post-delete graph.
2687        // After all that, the deleted rowid must NOT come back from a
2688        // KNN query.
2689        let path = tmp_path("hnsw_delete_rebuild");
2690        let mut db = Database::new("test".to_string());
2691        process_command(
2692            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2693            &mut db,
2694        )
2695        .unwrap();
2696        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2697            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2698        }
2699        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2700
2701        // Delete row 1 (the closest match to [0.5, 0.0]).
2702        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2703        // Confirm it marked dirty.
2704        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2705        assert!(dirty_before_save, "DELETE should mark dirty");
2706
2707        save_database(&mut db, &path).expect("save");
2708        // Confirm save cleared the dirty flag.
2709        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2710        assert!(!dirty_after_save, "save should clear dirty");
2711        drop(db);
2712
2713        // Reopen, query for the closest match. Row 1 is gone; row 2
2714        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2715        let loaded = open_database(&path, "test".to_string()).expect("open");
2716        let docs = loaded.get_table("docs".to_string()).expect("docs");
2717
2718        // Row 1 must not appear in any storage anymore.
2719        assert!(
2720            !docs.rowids().contains(&1),
2721            "deleted row 1 should not be in row storage"
2722        );
2723        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2724
2725        // The HNSW index must also have shed the deleted node.
2726        assert_eq!(
2727            docs.hnsw_indexes[0].index.len(),
2728            3,
2729            "HNSW graph should have shed the deleted node"
2730        );
2731
2732        cleanup(&path);
2733    }
2734
2735    #[test]
2736    fn round_trip_survives_writes_after_load() {
2737        let path = tmp_path("after_load");
2738        save_database(&mut seed_db(), &path).unwrap();
2739
2740        {
2741            let mut db = open_database(&path, "test".to_string()).unwrap();
2742            process_command(
2743                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2744                &mut db,
2745            )
2746            .unwrap();
2747            save_database(&mut db, &path).unwrap();
2748        } // db drops → its exclusive lock releases before we reopen below.
2749
2750        let db2 = open_database(&path, "test".to_string()).unwrap();
2751        let users = db2.get_table("users".to_string()).unwrap();
2752        assert_eq!(users.rowids().len(), 3);
2753
2754        cleanup(&path);
2755    }
2756
2757    #[test]
2758    fn open_rejects_garbage_file() {
2759        let path = tmp_path("bad");
2760        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2761        let result = open_database(&path, "x".to_string());
2762        assert!(result.is_err());
2763        cleanup(&path);
2764    }
2765
2766    #[test]
2767    fn many_small_rows_spread_across_leaves() {
2768        let path = tmp_path("many_rows");
2769        let mut db = Database::new("big".to_string());
2770        process_command(
2771            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2772            &mut db,
2773        )
2774        .unwrap();
2775        for i in 0..200 {
2776            let body = "x".repeat(200);
2777            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2778            process_command(&q, &mut db).unwrap();
2779        }
2780        save_database(&mut db, &path).unwrap();
2781        let loaded = open_database(&path, "big".to_string()).unwrap();
2782        let things = loaded.get_table("things".to_string()).unwrap();
2783        assert_eq!(things.rowids().len(), 200);
2784        cleanup(&path);
2785    }
2786
2787    #[test]
2788    fn huge_row_goes_through_overflow() {
2789        let path = tmp_path("overflow_row");
2790        let mut db = Database::new("big".to_string());
2791        process_command(
2792            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2793            &mut db,
2794        )
2795        .unwrap();
2796        let body = "A".repeat(10_000);
2797        process_command(
2798            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2799            &mut db,
2800        )
2801        .unwrap();
2802        save_database(&mut db, &path).unwrap();
2803
2804        let loaded = open_database(&path, "big".to_string()).unwrap();
2805        let docs = loaded.get_table("docs".to_string()).unwrap();
2806        let rowids = docs.rowids();
2807        assert_eq!(rowids.len(), 1);
2808        let stored = docs.get_value("body", rowids[0]);
2809        match stored {
2810            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2811            other => panic!("expected Text, got {other:?}"),
2812        }
2813        cleanup(&path);
2814    }
2815
2816    #[test]
2817    fn create_sql_synthesis_round_trips() {
2818        // Build a table via CREATE, then verify table_to_create_sql +
2819        // parse_create_sql reproduce an equivalent column list.
2820        let mut db = Database::new("x".to_string());
2821        process_command(
2822            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2823            &mut db,
2824        )
2825        .unwrap();
2826        let t = db.get_table("t".to_string()).unwrap();
2827        let sql = table_to_create_sql(t);
2828        let (name, cols) = parse_create_sql(&sql).unwrap();
2829        assert_eq!(name, "t");
2830        assert_eq!(cols.len(), 3);
2831        assert!(cols[0].is_pk);
2832        assert!(cols[1].is_unique);
2833        assert!(cols[2].not_null);
2834    }
2835
2836    #[test]
2837    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2838        // After open, the public db.tables map should not list the master.
2839        let path = tmp_path("no_master");
2840        save_database(&mut seed_db(), &path).unwrap();
2841        let loaded = open_database(&path, "x".to_string()).unwrap();
2842        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2843        cleanup(&path);
2844    }
2845
2846    #[test]
2847    fn multi_leaf_table_produces_an_interior_root() {
2848        // 200 fat rows force the table into multiple leaves, which means
2849        // save_database must build at least one InteriorNode above them.
2850        // The test verifies the round-trip works and confirms the root is
2851        // indeed an interior page (not a leaf) by reading the page type
2852        // directly out of the open pager.
2853        let path = tmp_path("multi_leaf_interior");
2854        let mut db = Database::new("big".to_string());
2855        process_command(
2856            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2857            &mut db,
2858        )
2859        .unwrap();
2860        for i in 0..200 {
2861            let body = "x".repeat(200);
2862            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2863            process_command(&q, &mut db).unwrap();
2864        }
2865        save_database(&mut db, &path).unwrap();
2866
2867        // Confirm the round-trip preserved all 200 rows.
2868        let loaded = open_database(&path, "big".to_string()).unwrap();
2869        let things = loaded.get_table("things".to_string()).unwrap();
2870        assert_eq!(things.rowids().len(), 200);
2871
2872        // Peek at `things`'s root page via the pager attached to the
2873        // loaded DB and check it's an InteriorNode, not a leaf.
2874        let pager = loaded
2875            .pager
2876            .as_ref()
2877            .expect("loaded DB should have a pager");
2878        // sqlrite_master's row for `things` holds its root page. Easiest
2879        // way to find it: walk the leaf chain by using find_leftmost_leaf
2880        // and then hop one level up. Simpler: read the master, scan for
2881        // the "things" row, look up rootpage.
2882        let mut master = build_empty_master_table();
2883        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2884        let things_root = master
2885            .rowids()
2886            .into_iter()
2887            .find_map(|r| match master.get_value("name", r) {
2888                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2889                    Some(Value::Integer(p)) => Some(p as u32),
2890                    _ => None,
2891                },
2892                _ => None,
2893            })
2894            .expect("things should appear in sqlrite_master");
2895        let root_buf = pager.read_page(things_root).unwrap();
2896        assert_eq!(
2897            root_buf[0],
2898            PageType::InteriorNode as u8,
2899            "expected a multi-leaf table to have an interior root, got tag {}",
2900            root_buf[0]
2901        );
2902
2903        cleanup(&path);
2904    }
2905
2906    #[test]
2907    fn explicit_index_persists_across_save_and_open() {
2908        let path = tmp_path("idx_persist");
2909        let mut db = Database::new("idx".to_string());
2910        process_command(
2911            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2912            &mut db,
2913        )
2914        .unwrap();
2915        for i in 1..=5 {
2916            let tag = if i % 2 == 0 { "odd" } else { "even" };
2917            process_command(
2918                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2919                &mut db,
2920            )
2921            .unwrap();
2922        }
2923        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2924        save_database(&mut db, &path).unwrap();
2925
2926        let loaded = open_database(&path, "idx".to_string()).unwrap();
2927        let users = loaded.get_table("users".to_string()).unwrap();
2928        let idx = users
2929            .index_by_name("users_tag_idx")
2930            .expect("explicit index should survive save/open");
2931        assert_eq!(idx.column_name, "tag");
2932        assert!(!idx.is_unique);
2933        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2934        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2935        let even_rowids = idx.lookup(&Value::Text("even".into()));
2936        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2937        assert_eq!(even_rowids.len(), 3);
2938        assert_eq!(odd_rowids.len(), 2);
2939
2940        cleanup(&path);
2941    }
2942
2943    #[test]
2944    fn auto_indexes_for_unique_columns_survive_save_open() {
2945        let path = tmp_path("auto_idx_persist");
2946        let mut db = Database::new("a".to_string());
2947        process_command(
2948            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2949            &mut db,
2950        )
2951        .unwrap();
2952        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2953        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2954        save_database(&mut db, &path).unwrap();
2955
2956        let loaded = open_database(&path, "a".to_string()).unwrap();
2957        let users = loaded.get_table("users".to_string()).unwrap();
2958        // Every UNIQUE column auto-creates an index; the load path populated
2959        // it from the persisted entries.
2960        let auto_name = SecondaryIndex::auto_name("users", "email");
2961        let idx = users
2962            .index_by_name(&auto_name)
2963            .expect("auto index should be restored");
2964        assert!(idx.is_unique);
2965        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2966        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2967
2968        cleanup(&path);
2969    }
2970
2971    /// SQLR-1 — `CREATE INDEX` on a wide table must round-trip when the
2972    /// index B-tree grows past one leaf and needs an interior level.
2973    /// Before the fix, the post-DDL auto-save panicked with
2974    /// `Internal("unknown paged-entry kind tag 0x4 …")` because a
2975    /// table-cell decoder was being run against an index leaf
2976    /// (`KIND_INDEX = 0x04`).
2977    ///
2978    /// 5 000 rows mirror the original repro from the issue and exceed
2979    /// every leaf-fanout cliff for the small `(rowid, value)` cells in
2980    /// a TEXT-keyed secondary index.
2981    #[test]
2982    fn secondary_index_with_interior_level_round_trips() {
2983        let path = tmp_path("sqlr1_wide_index");
2984        let mut db = Database::new("idx".to_string());
2985        db.source_path = Some(path.clone());
2986
2987        process_command(
2988            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2989            &mut db,
2990        )
2991        .unwrap();
2992        // BEGIN/COMMIT collapses 5 000 inserts into one save (matches
2993        // `auto_vacuum_setup` and the issue's repro shape).
2994        process_command("BEGIN;", &mut db).unwrap();
2995        for i in 0..5000 {
2996            process_command(
2997                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2998                &mut db,
2999            )
3000            .unwrap();
3001        }
3002        process_command("COMMIT;", &mut db).unwrap();
3003
3004        // The DDL that used to panic.
3005        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
3006
3007        // Reopen and verify lookups, plus that the index tree actually
3008        // grew an interior layer (otherwise this test wouldn't cover the
3009        // regression).
3010        drop(db);
3011        let loaded = open_database(&path, "idx".to_string()).unwrap();
3012        let bloat = loaded.get_table("bloat".to_string()).unwrap();
3013        let idx = bloat
3014            .index_by_name("idx_p")
3015            .expect("idx_p should survive close/reopen");
3016        assert!(!idx.is_unique);
3017
3018        // Spot-check the keyspace: first, middle, last value each map
3019        // back to exactly the row that carried them.
3020        for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
3021            let value = Value::Text(format!("p-{probe_i:08}"));
3022            let hits = idx.lookup(&value);
3023            assert_eq!(
3024                hits,
3025                vec![expected_rowid],
3026                "lookup({value:?}) should yield rowid {expected_rowid}",
3027            );
3028        }
3029
3030        // Confirm the index tree is multi-level (the regression's
3031        // necessary condition) — root must be an `InteriorNode` and
3032        // `find_leftmost_leaf` must reach a `TableLeaf` through it.
3033        let pager = loaded.pager.as_ref().unwrap();
3034        let mut master = build_empty_master_table();
3035        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
3036        let idx_root = master
3037            .rowids()
3038            .into_iter()
3039            .find_map(
3040                |r| match (master.get_value("name", r), master.get_value("type", r)) {
3041                    (Some(Value::Text(name)), Some(Value::Text(kind)))
3042                        if name == "idx_p" && kind == "index" =>
3043                    {
3044                        match master.get_value("rootpage", r) {
3045                            Some(Value::Integer(p)) => Some(p as u32),
3046                            _ => None,
3047                        }
3048                    }
3049                    _ => None,
3050                },
3051            )
3052            .expect("idx_p should appear in sqlrite_master");
3053        let root_buf = pager.read_page(idx_root).unwrap();
3054        assert_eq!(
3055            root_buf[0],
3056            PageType::InteriorNode as u8,
3057            "5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
3058        );
3059        let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
3060        let leaf_buf = pager.read_page(leaf).unwrap();
3061        assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
3062
3063        cleanup(&path);
3064    }
3065
3066    /// SQLR-1 follow-on — the page-recycling path between two large
3067    /// versions of the same index name must not corrupt cell decoding.
3068    /// `DROP INDEX` returns its pages to the freelist; the next
3069    /// `CREATE INDEX` is free to reuse them. If the allocator hands an
3070    /// old index leaf to a *table* without zeroing it, an upstream
3071    /// table walk would see KIND_INDEX cells and panic.
3072    #[test]
3073    fn drop_then_recreate_wide_index_does_not_panic() {
3074        let path = tmp_path("sqlr1_drop_recreate");
3075        let mut db = Database::new("idx".to_string());
3076        db.source_path = Some(path.clone());
3077
3078        process_command(
3079            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3080            &mut db,
3081        )
3082        .unwrap();
3083        process_command("BEGIN;", &mut db).unwrap();
3084        for i in 0..5000 {
3085            process_command(
3086                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
3087                &mut db,
3088            )
3089            .unwrap();
3090        }
3091        process_command("COMMIT;", &mut db).unwrap();
3092
3093        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
3094        process_command("DROP INDEX idx_p;", &mut db).unwrap();
3095        // Recreate from scratch — exercises the recycle path.
3096        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
3097
3098        drop(db);
3099        let loaded = open_database(&path, "idx".to_string()).unwrap();
3100        let bloat = loaded.get_table("bloat".to_string()).unwrap();
3101        let idx = bloat
3102            .index_by_name("idx_p")
3103            .expect("idx_p should survive drop+recreate+reopen");
3104        assert_eq!(
3105            idx.lookup(&Value::Text("p-00002500".into())),
3106            vec![2501],
3107            "post-recycle lookup must still resolve correctly",
3108        );
3109
3110        cleanup(&path);
3111    }
3112
3113    #[test]
3114    fn deep_tree_round_trips() {
3115        // Force a 3-level tree by bypassing process_command (which prints
3116        // the full table on every INSERT, making large bulk loads O(N^2)
3117        // in I/O). We build the Table directly via restore_row.
3118        use crate::sql::db::table::Column as TableColumn;
3119
3120        let path = tmp_path("deep_tree");
3121        let mut db = Database::new("deep".to_string());
3122        let columns = vec![
3123            TableColumn::new("id".into(), "integer".into(), true, true, true),
3124            TableColumn::new("s".into(), "text".into(), false, true, false),
3125        ];
3126        let mut table = build_empty_table("t", columns, 0);
3127        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
3128        // which with interior fanout ~400 needs 2 interior levels (3-level
3129        // tree total, counting leaves).
3130        for i in 1..=6_000i64 {
3131            let body = "q".repeat(900);
3132            table
3133                .restore_row(
3134                    i,
3135                    vec![
3136                        Some(Value::Integer(i)),
3137                        Some(Value::Text(format!("r-{i}-{body}"))),
3138                    ],
3139                )
3140                .unwrap();
3141        }
3142        db.tables.insert("t".to_string(), table);
3143        save_database(&mut db, &path).unwrap();
3144
3145        let loaded = open_database(&path, "deep".to_string()).unwrap();
3146        let t = loaded.get_table("t".to_string()).unwrap();
3147        assert_eq!(t.rowids().len(), 6_000);
3148
3149        // Confirm the tree actually grew past 2 levels — i.e., the root's
3150        // leftmost child is itself an interior page, not a leaf.
3151        let pager = loaded.pager.as_ref().unwrap();
3152        let mut master = build_empty_master_table();
3153        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
3154        let t_root = master
3155            .rowids()
3156            .into_iter()
3157            .find_map(|r| match master.get_value("name", r) {
3158                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
3159                    Some(Value::Integer(p)) => Some(p as u32),
3160                    _ => None,
3161                },
3162                _ => None,
3163            })
3164            .expect("t in sqlrite_master");
3165        let root_buf = pager.read_page(t_root).unwrap();
3166        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
3167        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
3168            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
3169        let root_interior = InteriorPage::from_bytes(root_payload);
3170        let child = root_interior.leftmost_child().unwrap();
3171        let child_buf = pager.read_page(child).unwrap();
3172        assert_eq!(
3173            child_buf[0],
3174            PageType::InteriorNode as u8,
3175            "expected 3-level tree: root's leftmost child should also be InteriorNode",
3176        );
3177
3178        cleanup(&path);
3179    }
3180
3181    #[test]
3182    fn alter_rename_table_survives_save_and_reopen() {
3183        let path = tmp_path("alter_rename_table_roundtrip");
3184        let mut db = seed_db();
3185        save_database(&mut db, &path).expect("save");
3186
3187        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
3188        save_database(&mut db, &path).expect("save after rename");
3189
3190        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3191        assert!(!loaded.contains_table("users".to_string()));
3192        assert!(loaded.contains_table("members".to_string()));
3193        let members = loaded.get_table("members".to_string()).unwrap();
3194        assert_eq!(members.rowids().len(), 2, "rows should survive");
3195        // Auto-indexes followed the rename.
3196        assert!(
3197            members
3198                .index_by_name("sqlrite_autoindex_members_id")
3199                .is_some()
3200        );
3201        assert!(
3202            members
3203                .index_by_name("sqlrite_autoindex_members_name")
3204                .is_some()
3205        );
3206
3207        cleanup(&path);
3208    }
3209
3210    #[test]
3211    fn alter_rename_column_survives_save_and_reopen() {
3212        let path = tmp_path("alter_rename_col_roundtrip");
3213        let mut db = seed_db();
3214        save_database(&mut db, &path).expect("save");
3215
3216        process_command(
3217            "ALTER TABLE users RENAME COLUMN name TO full_name;",
3218            &mut db,
3219        )
3220        .expect("rename column");
3221        save_database(&mut db, &path).expect("save after rename");
3222
3223        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3224        let users = loaded.get_table("users".to_string()).unwrap();
3225        assert!(users.contains_column("full_name".to_string()));
3226        assert!(!users.contains_column("name".to_string()));
3227        // Verify a row's value survived the rename round-trip.
3228        let alice_rowid = users
3229            .rowids()
3230            .into_iter()
3231            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
3232            .expect("alice row should be findable under renamed column");
3233        assert_eq!(
3234            users.get_value("full_name", alice_rowid),
3235            Some(Value::Text("alice".to_string()))
3236        );
3237
3238        cleanup(&path);
3239    }
3240
3241    #[test]
3242    fn alter_add_column_with_default_survives_save_and_reopen() {
3243        let path = tmp_path("alter_add_default_roundtrip");
3244        let mut db = seed_db();
3245        save_database(&mut db, &path).expect("save");
3246
3247        process_command(
3248            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
3249            &mut db,
3250        )
3251        .expect("add column");
3252        save_database(&mut db, &path).expect("save after add");
3253
3254        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3255        let users = loaded.get_table("users".to_string()).unwrap();
3256        assert!(users.contains_column("status".to_string()));
3257        for rowid in users.rowids() {
3258            assert_eq!(
3259                users.get_value("status", rowid),
3260                Some(Value::Text("active".to_string())),
3261                "backfilled default should round-trip for rowid {rowid}"
3262            );
3263        }
3264        // The DEFAULT clause itself should still be on the column metadata
3265        // so a subsequent INSERT picks it up.
3266        let status_col = users
3267            .columns
3268            .iter()
3269            .find(|c| c.column_name == "status")
3270            .unwrap();
3271        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
3272
3273        cleanup(&path);
3274    }
3275
3276    #[test]
3277    fn alter_drop_column_survives_save_and_reopen() {
3278        let path = tmp_path("alter_drop_col_roundtrip");
3279        let mut db = seed_db();
3280        save_database(&mut db, &path).expect("save");
3281
3282        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
3283        save_database(&mut db, &path).expect("save after drop");
3284
3285        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3286        let users = loaded.get_table("users".to_string()).unwrap();
3287        assert!(!users.contains_column("age".to_string()));
3288        assert!(users.contains_column("name".to_string()));
3289
3290        cleanup(&path);
3291    }
3292
3293    #[test]
3294    fn drop_table_survives_save_and_reopen() {
3295        let path = tmp_path("drop_table_roundtrip");
3296        let mut db = seed_db();
3297        save_database(&mut db, &path).expect("save");
3298
3299        // Verify both tables landed.
3300        {
3301            let loaded = open_database(&path, "t".to_string()).expect("open");
3302            assert!(loaded.contains_table("users".to_string()));
3303            assert!(loaded.contains_table("notes".to_string()));
3304        }
3305
3306        process_command("DROP TABLE users;", &mut db).expect("drop users");
3307        save_database(&mut db, &path).expect("save after drop");
3308
3309        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3310        assert!(
3311            !loaded.contains_table("users".to_string()),
3312            "dropped table should not resurface on reopen"
3313        );
3314        assert!(
3315            loaded.contains_table("notes".to_string()),
3316            "untouched table should survive"
3317        );
3318
3319        cleanup(&path);
3320    }
3321
3322    #[test]
3323    fn drop_index_survives_save_and_reopen() {
3324        let path = tmp_path("drop_index_roundtrip");
3325        let mut db = Database::new("t".to_string());
3326        process_command(
3327            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3328            &mut db,
3329        )
3330        .unwrap();
3331        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
3332        save_database(&mut db, &path).expect("save");
3333
3334        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
3335        save_database(&mut db, &path).expect("save after drop");
3336
3337        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3338        let notes = loaded.get_table("notes".to_string()).unwrap();
3339        assert!(
3340            notes.index_by_name("notes_body_idx").is_none(),
3341            "dropped index should not resurface on reopen"
3342        );
3343        // The auto-index for the PK should still be there.
3344        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
3345
3346        cleanup(&path);
3347    }
3348
3349    #[test]
3350    fn default_clause_survives_save_and_reopen() {
3351        let path = tmp_path("default_roundtrip");
3352        let mut db = Database::new("t".to_string());
3353
3354        process_command(
3355            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
3356            &mut db,
3357        )
3358        .unwrap();
3359        save_database(&mut db, &path).expect("save");
3360
3361        let mut loaded = open_database(&path, "t".to_string()).expect("open");
3362
3363        // The reloaded column metadata should still carry the DEFAULT.
3364        let users = loaded.get_table("users".to_string()).expect("users table");
3365        let status_col = users
3366            .columns
3367            .iter()
3368            .find(|c| c.column_name == "status")
3369            .expect("status column");
3370        assert_eq!(
3371            status_col.default,
3372            Some(Value::Text("active".to_string())),
3373            "DEFAULT 'active' should round-trip"
3374        );
3375        let score_col = users
3376            .columns
3377            .iter()
3378            .find(|c| c.column_name == "score")
3379            .expect("score column");
3380        assert_eq!(
3381            score_col.default,
3382            Some(Value::Integer(0)),
3383            "DEFAULT 0 should round-trip"
3384        );
3385
3386        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3387        // columns should pick them up from the reloaded schema.
3388        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
3389        let users = loaded.get_table("users".to_string()).unwrap();
3390        assert_eq!(
3391            users.get_value("status", 1),
3392            Some(Value::Text("active".to_string()))
3393        );
3394        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
3395
3396        cleanup(&path);
3397    }
3398
3399    // ---------------------------------------------------------------------
3400    // SQLR-6 — free-list + VACUUM tests
3401    // ---------------------------------------------------------------------
3402
3403    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3404    /// rather than extending the file. The page_count after drop+create
3405    /// should be at most what it was after the original two tables —
3406    /// proving the new table landed on freelist pages.
3407    #[test]
3408    fn drop_table_freelist_persists_pages_for_reuse() {
3409        let path = tmp_path("freelist_reuse");
3410        let mut db = seed_db();
3411        db.source_path = Some(path.clone());
3412        save_database(&mut db, &path).expect("save");
3413        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3414
3415        // Drop one table; its pages go on the freelist.
3416        process_command("DROP TABLE users;", &mut db).expect("drop users");
3417        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
3418        assert_eq!(
3419            pages_after_drop, pages_two_tables,
3420            "page_count should not shrink on drop — the freed pages persist on the freelist"
3421        );
3422        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
3423        assert!(
3424            head_after_drop != 0,
3425            "freelist_head must be non-zero after drop"
3426        );
3427
3428        // Re-create a similar-shaped table; should reuse freelist pages.
3429        process_command(
3430            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3431            &mut db,
3432        )
3433        .expect("create accounts");
3434        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
3435        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
3436        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3437        assert!(
3438            pages_after_create <= pages_two_tables + 2,
3439            "creating a similar-sized table after a drop should mostly draw from the \
3440             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3441        );
3442
3443        cleanup(&path);
3444    }
3445
3446    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3447    #[test]
3448    fn drop_then_vacuum_shrinks_file() {
3449        let path = tmp_path("vacuum_shrinks");
3450        let mut db = seed_db();
3451        db.source_path = Some(path.clone());
3452        // Add a few more rows to make the dropped table bigger.
3453        for i in 0..20 {
3454            process_command(
3455                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
3456                &mut db,
3457            )
3458            .unwrap();
3459        }
3460        save_database(&mut db, &path).expect("save");
3461
3462        process_command("DROP TABLE users;", &mut db).expect("drop");
3463        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
3464        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
3465        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
3466        assert!(head_before != 0, "drop should populate the freelist");
3467
3468        // VACUUM (via process_command) checkpoints internally so the
3469        // file actually shrinks on disk before we observe its size.
3470        process_command("VACUUM;", &mut db).expect("vacuum");
3471
3472        let size_after = std::fs::metadata(&path).unwrap().len();
3473        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3474        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3475        assert!(
3476            pages_after < pages_before_vacuum,
3477            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3478        );
3479        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
3480        assert!(
3481            size_after < size_before_vacuum,
3482            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3483        );
3484
3485        cleanup(&path);
3486    }
3487
3488    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3489    #[test]
3490    fn vacuum_round_trips_data() {
3491        let path = tmp_path("vacuum_round_trip");
3492        let mut db = seed_db();
3493        db.source_path = Some(path.clone());
3494        save_database(&mut db, &path).expect("save");
3495        process_command("VACUUM;", &mut db).expect("vacuum");
3496
3497        // Re-open from disk to make sure the on-disk catalog round-trips.
3498        drop(db);
3499        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
3500        assert!(loaded.contains_table("users".to_string()));
3501        assert!(loaded.contains_table("notes".to_string()));
3502        let users = loaded.get_table("users".to_string()).unwrap();
3503        // seed_db inserts two users.
3504        assert_eq!(users.rowids().len(), 2);
3505
3506        cleanup(&path);
3507    }
3508
3509    /// Format version is bumped to v6 only after a save that creates a
3510    /// non-empty freelist. VACUUM clears the freelist but doesn't
3511    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3512    #[test]
3513    fn freelist_format_version_promotion() {
3514        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3515        let path = tmp_path("v6_promotion");
3516        let mut db = seed_db();
3517        db.source_path = Some(path.clone());
3518        save_database(&mut db, &path).expect("save");
3519        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
3520        assert_eq!(
3521            v_after_save, FORMAT_VERSION_BASELINE,
3522            "fresh DB without drops should stay at the baseline version"
3523        );
3524
3525        process_command("DROP TABLE users;", &mut db).expect("drop");
3526        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
3527        assert_eq!(
3528            v_after_drop, FORMAT_VERSION_V6,
3529            "first save with a non-empty freelist must promote to V6"
3530        );
3531
3532        process_command("VACUUM;", &mut db).expect("vacuum");
3533        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
3534        assert_eq!(
3535            v_after_vacuum, FORMAT_VERSION_V6,
3536            "VACUUM must not downgrade — V6 is a strict superset"
3537        );
3538
3539        cleanup(&path);
3540    }
3541
3542    /// Freelist persists across reopen: drop, save, close, reopen,
3543    /// confirm the next CREATE TABLE re-uses pages from the persisted
3544    /// freelist (rather than extending the file).
3545    #[test]
3546    fn freelist_round_trip_through_reopen() {
3547        let path = tmp_path("freelist_reopen");
3548        let pages_two_tables;
3549        {
3550            let mut db = seed_db();
3551            db.source_path = Some(path.clone());
3552            save_database(&mut db, &path).expect("save");
3553            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3554            process_command("DROP TABLE users;", &mut db).expect("drop");
3555            let head = db.pager.as_ref().unwrap().header().freelist_head;
3556            assert!(head != 0, "drop must populate the freelist");
3557        }
3558
3559        // Reopen from disk — the freelist must come back.
3560        let mut db = open_database(&path, "t".to_string()).expect("reopen");
3561        assert!(
3562            db.pager.as_ref().unwrap().header().freelist_head != 0,
3563            "freelist_head must survive close/reopen"
3564        );
3565
3566        process_command(
3567            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3568            &mut db,
3569        )
3570        .expect("create accounts");
3571        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
3572        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3573        assert!(
3574            pages_after_create <= pages_two_tables + 2,
3575            "post-reopen create should reuse freelist (got {pages_after_create} > \
3576             {pages_two_tables} + 2 — file extended instead of reusing)"
3577        );
3578
3579        cleanup(&path);
3580    }
3581
3582    /// VACUUM inside an explicit transaction must error before touching the
3583    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3584    #[test]
3585    fn vacuum_inside_transaction_is_rejected() {
3586        let path = tmp_path("vacuum_txn");
3587        let mut db = seed_db();
3588        db.source_path = Some(path.clone());
3589        save_database(&mut db, &path).expect("save");
3590
3591        process_command("BEGIN;", &mut db).expect("begin");
3592        let err = process_command("VACUUM;", &mut db).unwrap_err();
3593        assert!(
3594            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3595            "expected in-transaction rejection, got: {err}"
3596        );
3597        // Roll back to leave the DB in a clean state.
3598        process_command("ROLLBACK;", &mut db).unwrap();
3599        cleanup(&path);
3600    }
3601
3602    /// VACUUM on an in-memory database is a documented no-op.
3603    #[test]
3604    fn vacuum_on_in_memory_database_is_noop() {
3605        let mut db = Database::new("mem".to_string());
3606        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
3607        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
3608        assert!(
3609            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
3610            "expected no-op message for in-memory VACUUM, got: {out}"
3611        );
3612    }
3613
3614    /// Untouched tables shouldn't write any pages on the save that
3615    /// follows a DROP of an unrelated table. Confirms the per-table
3616    /// preferred pool keeps page numbers stable so the diff pager skips
3617    /// every byte-identical leaf.
3618    #[test]
3619    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3620        // Need three tables so dropping one in the middle still leaves
3621        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3622        //   accounts, notes, users
3623        // Drop `notes`. `accounts` and `users` should keep their pages.
3624        let path = tmp_path("diff_after_drop");
3625        let mut db = Database::new("t".to_string());
3626        db.source_path = Some(path.clone());
3627        process_command(
3628            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3629            &mut db,
3630        )
3631        .unwrap();
3632        process_command(
3633            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3634            &mut db,
3635        )
3636        .unwrap();
3637        process_command(
3638            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3639            &mut db,
3640        )
3641        .unwrap();
3642        for i in 0..5 {
3643            process_command(
3644                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
3645                &mut db,
3646            )
3647            .unwrap();
3648            process_command(
3649                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
3650                &mut db,
3651            )
3652            .unwrap();
3653            process_command(
3654                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
3655                &mut db,
3656            )
3657            .unwrap();
3658        }
3659        save_database(&mut db, &path).expect("baseline save");
3660
3661        // Capture page bytes for `accounts` and `users` so we can
3662        // verify they don't change.
3663        let pager = db.pager.as_ref().unwrap();
3664        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
3665            .unwrap()
3666            .get(&("table".to_string(), "accounts".to_string()))
3667            .copied()
3668            .unwrap();
3669        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
3670            .unwrap()
3671            .get(&("table".to_string(), "users".to_string()))
3672            .copied()
3673            .unwrap();
3674        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
3675        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
3676
3677        // Drop the middle table.
3678        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
3679
3680        let pager = db.pager.as_ref().unwrap();
3681        // `accounts` and `users` should still live at the same pages
3682        // with byte-identical content.
3683        let acc_after = pager.read_page(acc_root).unwrap();
3684        let users_after = pager.read_page(users_root).unwrap();
3685        assert_eq!(
3686            &acc_after[..],
3687            &acc_bytes_before[..],
3688            "accounts root page must not be rewritten when an unrelated table is dropped"
3689        );
3690        assert_eq!(
3691            &users_after[..],
3692            &users_bytes_before[..],
3693            "users root page must not be rewritten when an unrelated table is dropped"
3694        );
3695
3696        cleanup(&path);
3697    }
3698
3699    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3700
3701    /// Builds a file-backed DB with one small "keep" table and one
3702    /// large "bloat" table, sized so the post-drop freelist will
3703    /// comfortably cross the default 25% threshold and the
3704    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3705    /// auto-VACUUM happy-path tests.
3706    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
3707        let mut db = Database::new("av".to_string());
3708        db.source_path = Some(path.to_path_buf());
3709        process_command(
3710            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3711            &mut db,
3712        )
3713        .unwrap();
3714        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
3715        process_command(
3716            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3717            &mut db,
3718        )
3719        .unwrap();
3720        // Wrap the bulk insert in a transaction so we pay one save at
3721        // COMMIT instead of 5000 round-trips through auto-save.
3722        process_command("BEGIN;", &mut db).unwrap();
3723        for i in 0..5000 {
3724            process_command(
3725                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
3726                &mut db,
3727            )
3728            .unwrap();
3729        }
3730        process_command("COMMIT;", &mut db).unwrap();
3731        db
3732    }
3733
3734    /// Default threshold (0.25) is engaged for fresh `Database`s and
3735    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3736    /// without anyone calling `VACUUM;`.
3737    #[test]
3738    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3739        let path = tmp_path("av_default_drop_table");
3740        let mut db = auto_vacuum_setup(&path);
3741        // Sanity: setup respects the shipped default.
3742        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3743
3744        // Checkpoint before measuring `size_before` so the bloat actually
3745        // lives in the main file and not just the WAL — otherwise
3746        // `size_before` is the bare 2-page header and any post-vacuum
3747        // checkpoint will look like the file *grew*.
3748        if let Some(p) = db.pager.as_mut() {
3749            let _ = p.checkpoint();
3750        }
3751        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3752        let size_before = std::fs::metadata(&path).unwrap().len();
3753        assert!(
3754            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
3755            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3756             pages so the floor doesn't suppress the trigger; got {pages_before}"
3757        );
3758
3759        // Drop the bloat table — freelist should pass 25% of page_count
3760        // and the auto-VACUUM hook should compact in place. Note: no
3761        // explicit `VACUUM;` statement is issued.
3762        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3763
3764        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3765        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3766        // Second checkpoint so the post-vacuum file shrinks on disk
3767        // (auto-VACUUM stages the compact through WAL just like manual
3768        // VACUUM does).
3769        if let Some(p) = db.pager.as_mut() {
3770            let _ = p.checkpoint();
3771        }
3772        let size_after = std::fs::metadata(&path).unwrap().len();
3773
3774        assert!(
3775            pages_after < pages_before,
3776            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3777        );
3778        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
3779        assert!(
3780            size_after < size_before,
3781            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3782        );
3783
3784        cleanup(&path);
3785    }
3786
3787    /// Setting the threshold to `None` disables the trigger entirely:
3788    /// the same workload that shrinks under the default leaves the file
3789    /// at its high-water mark.
3790    #[test]
3791    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3792        let path = tmp_path("av_disabled");
3793        let mut db = auto_vacuum_setup(&path);
3794        db.set_auto_vacuum_threshold(None).expect("disable");
3795        assert_eq!(db.auto_vacuum_threshold(), None);
3796
3797        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3798
3799        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3800
3801        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3802        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3803        assert_eq!(
3804            pages_after, pages_before,
3805            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3806        );
3807        assert!(
3808            head_after != 0,
3809            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3810        );
3811
3812        cleanup(&path);
3813    }
3814
3815    /// `DROP INDEX` is the second of three page-releasing DDL paths
3816    /// covered by SQLR-10. We bloat the freelist via a separate
3817    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3818    /// compact early), then re-arm the trigger and drop a small index
3819    /// — the cumulative freelist crosses 25% on the index drop and
3820    /// auto-VACUUM fires.
3821    ///
3822    /// The detour around bloat is necessary because building a
3823    /// secondary index on a 5000-row column would need multi-level
3824    /// interior nodes, and the cell-decoder's interior-page support
3825    /// is a separate work item from SQLR-10.
3826    #[test]
3827    fn auto_vacuum_triggers_on_drop_index() {
3828        let path = tmp_path("av_drop_index");
3829        let mut db = auto_vacuum_setup(&path);
3830
3831        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3832        // its pages land on the freelist without being reclaimed.
3833        db.set_auto_vacuum_threshold(None).expect("disable");
3834        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
3835        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
3836        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
3837        assert!(
3838            head_after_bloat_drop != 0,
3839            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3840        );
3841
3842        // Phase 2: a small index on the surviving `keep` table. The
3843        // index reuses one page from the freelist (which is fine —
3844        // freelist still holds plenty more).
3845        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
3846
3847        // Phase 3: re-arm the trigger and drop the index. The freelist
3848        // is already heavily populated from phase 1; this drop just
3849        // adds the index page on top, keeping the ratio well above
3850        // 25%, so auto-VACUUM should fire.
3851        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
3852        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
3853
3854        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3855        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3856        assert!(
3857            pages_after < pages_after_bloat_drop,
3858            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3859             was {pages_after_bloat_drop}, now {pages_after}"
3860        );
3861        assert_eq!(
3862            head_after, 0,
3863            "auto-VACUUM after DROP INDEX must clear the freelist"
3864        );
3865
3866        cleanup(&path);
3867    }
3868
3869    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3870    /// the SQLR-10 trigger covers.
3871    #[test]
3872    fn auto_vacuum_triggers_on_alter_drop_column() {
3873        let path = tmp_path("av_alter_drop_col");
3874        let mut db = auto_vacuum_setup(&path);
3875        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3876
3877        // Drop the wide `payload` column — this rewrites every row in
3878        // `bloat` without the column, so the old leaf pages get freed.
3879        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
3880
3881        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3882        assert!(
3883            pages_after < pages_before,
3884            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3885             was {pages_before}, now {pages_after}"
3886        );
3887        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
3888
3889        cleanup(&path);
3890    }
3891
3892    /// A high threshold (0.99) suppresses the trigger when the freelist
3893    /// ratio is well below it — the file stays at HWM.
3894    #[test]
3895    fn auto_vacuum_skips_below_threshold() {
3896        let path = tmp_path("av_below_threshold");
3897        let mut db = auto_vacuum_setup(&path);
3898        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
3899
3900        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3901
3902        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3903
3904        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3905        assert_eq!(
3906            pages_after, pages_before,
3907            "freelist ratio after a single drop is far below 0.99 — \
3908             page_count must stay at the HWM"
3909        );
3910        assert!(
3911            db.pager.as_ref().unwrap().header().freelist_head != 0,
3912            "drop must still populate the freelist"
3913        );
3914
3915        cleanup(&path);
3916    }
3917
3918    /// Inside an explicit transaction, the page-releasing DDL doesn't
3919    /// flush to disk yet — the freelist isn't accurate, so the trigger
3920    /// must skip. The compact would also publish in-flight work out of
3921    /// band, which is exactly what the manual `VACUUM;` rejection
3922    /// inside a txn already prevents.
3923    #[test]
3924    fn auto_vacuum_skips_inside_transaction() {
3925        let path = tmp_path("av_in_txn");
3926        let mut db = auto_vacuum_setup(&path);
3927        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3928
3929        process_command("BEGIN;", &mut db).expect("begin");
3930        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
3931        // Mid-transaction: no save has occurred, so the on-disk
3932        // freelist_head must be unchanged and page_count must not have
3933        // shifted from a sneaky compact.
3934        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
3935        assert_eq!(
3936            pages_mid, pages_before,
3937            "auto-VACUUM must not fire mid-transaction"
3938        );
3939
3940        process_command("ROLLBACK;", &mut db).expect("rollback");
3941        cleanup(&path);
3942    }
3943
3944    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3945    /// trigger even if the ratio would otherwise qualify — the cost of
3946    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3947    #[test]
3948    fn auto_vacuum_skips_under_min_pages_floor() {
3949        let path = tmp_path("av_under_floor");
3950        let mut db = seed_db(); // small: just users + notes, ~5 pages
3951        db.source_path = Some(path.clone());
3952        save_database(&mut db, &path).expect("save");
3953        // Confirm we're below the floor so the test is meaningful.
3954        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3955        assert!(
3956            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
3957            "test setup is too large: floor would not apply (got {pages_before} pages, \
3958             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3959        );
3960
3961        process_command("DROP TABLE users;", &mut db).expect("drop");
3962
3963        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3964        assert_eq!(
3965            pages_after, pages_before,
3966            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3967        );
3968        assert!(
3969            db.pager.as_ref().unwrap().header().freelist_head != 0,
3970            "drop must still populate the freelist normally"
3971        );
3972
3973        cleanup(&path);
3974    }
3975
3976    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3977    /// rather than silently saturating.
3978    #[test]
3979    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3980        let mut db = Database::new("t".to_string());
3981        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3982            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
3983            assert!(
3984                format!("{err}").contains("auto_vacuum_threshold"),
3985                "expected a typed range error for {bad}, got: {err}"
3986            );
3987        }
3988        // The default survives the rejected sets unchanged.
3989        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3990        // And valid values land.
3991        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
3992        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
3993        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
3994        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
3995        db.set_auto_vacuum_threshold(None).unwrap();
3996        assert_eq!(db.auto_vacuum_threshold(), None);
3997    }
3998
3999    // ---------------------------------------------------------------
4000    // SQLR-13 — `PRAGMA auto_vacuum` SQL-level coverage. Mirrors the
4001    // SQLR-10 setter tests above, but routed through SQL so SDK / FFI
4002    // / MCP consumers (which can't reach the Rust setter directly)
4003    // get the same guarantees.
4004    // ---------------------------------------------------------------
4005
4006    /// `PRAGMA auto_vacuum = N;` set + `PRAGMA auto_vacuum;` read
4007    /// round-trip the threshold, observable via `auto_vacuum_threshold`.
4008    #[test]
4009    fn pragma_auto_vacuum_set_and_read_via_sql() {
4010        let mut db = Database::new("t".to_string());
4011
4012        let resp = process_command("PRAGMA auto_vacuum = 0.5;", &mut db).expect("set");
4013        assert!(
4014            resp.contains("PRAGMA"),
4015            "set form should produce a PRAGMA status, got: {resp}"
4016        );
4017        assert_eq!(db.auto_vacuum_threshold(), Some(0.5));
4018
4019        // Read form — status mentions a returned row.
4020        let resp = process_command("PRAGMA auto_vacuum;", &mut db).expect("read");
4021        assert!(resp.contains("1 row"), "expected a 1-row read, got: {resp}");
4022    }
4023
4024    /// `PRAGMA auto_vacuum = OFF;` (bare identifier — sqlparser's own
4025    /// pragma-value parser would reject this, the SQLR-13 dispatcher
4026    /// must accept it) and `= NONE;` both disable the trigger. So does
4027    /// the quoted form `'OFF'`.
4028    #[test]
4029    fn pragma_auto_vacuum_off_disables_trigger() {
4030        for raw in ["OFF", "off", "NONE", "none", "'OFF'", "'NONE'"] {
4031            let mut db = Database::new("t".to_string());
4032            assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
4033
4034            let stmt = format!("PRAGMA auto_vacuum = {raw};");
4035            process_command(&stmt, &mut db)
4036                .unwrap_or_else(|e| panic!("`{stmt}` should disable: {e}"));
4037            assert_eq!(
4038                db.auto_vacuum_threshold(),
4039                None,
4040                "`{stmt}` should clear the threshold"
4041            );
4042        }
4043    }
4044
4045    /// Out-of-range numeric values surface as a typed error via the
4046    /// shared `set_auto_vacuum_threshold` validator — no silent
4047    /// saturation. Mirrors the SQLR-10 setter coverage.
4048    #[test]
4049    fn pragma_auto_vacuum_rejects_out_of_range_via_sql() {
4050        let mut db = Database::new("t".to_string());
4051        for bad in ["-0.01", "1.01", "1.5"] {
4052            let stmt = format!("PRAGMA auto_vacuum = {bad};");
4053            let err = process_command(&stmt, &mut db).unwrap_err();
4054            assert!(
4055                format!("{err}").contains("auto_vacuum_threshold"),
4056                "expected range error for `{stmt}`, got: {err}"
4057            );
4058        }
4059        // Default survives all the rejected sets.
4060        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
4061    }
4062
4063    /// Junk strings (anything that isn't a number or `OFF`/`NONE`) are
4064    /// rejected at parse time with a typed error, not silently treated
4065    /// as "disable".
4066    #[test]
4067    fn pragma_auto_vacuum_rejects_unknown_strings_via_sql() {
4068        let mut db = Database::new("t".to_string());
4069        let err = process_command("PRAGMA auto_vacuum = WAL;", &mut db).unwrap_err();
4070        assert!(
4071            format!("{err}").contains("OFF/NONE"),
4072            "expected OFF/NONE-style error, got: {err}"
4073        );
4074        // Default unaffected.
4075        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
4076    }
4077
4078    /// Pragmas SQLRite doesn't know about return `NotImplemented` —
4079    /// not a generic parser error. Future pragmas plug in here.
4080    /// (Phase 11.3 made `journal_mode` a recognised pragma; this
4081    /// test uses a name that's still unsupported.)
4082    #[test]
4083    fn pragma_unknown_returns_not_implemented() {
4084        let mut db = Database::new("t".to_string());
4085        let err = process_command("PRAGMA synchronous = NORMAL;", &mut db).unwrap_err();
4086        assert!(
4087            matches!(err, SQLRiteError::NotImplemented(_)),
4088            "unknown pragma must surface NotImplemented, got: {err:?}"
4089        );
4090    }
4091
4092    /// Setting the threshold via SQL must produce identical behavior to
4093    /// the Rust setter on the actual auto-VACUUM trigger: `= 0.99`
4094    /// suppresses, `= OFF` disables, default fires. Sanity-checks that
4095    /// `process_command_with_render`'s pre-parse step doesn't desync
4096    /// the in-memory state from the file.
4097    #[test]
4098    fn pragma_auto_vacuum_drives_real_trigger() {
4099        // Sub-case A — `PRAGMA auto_vacuum = OFF;` keeps file at HWM.
4100        {
4101            let path = tmp_path("av_pragma_off");
4102            let mut db = auto_vacuum_setup(&path);
4103            process_command("PRAGMA auto_vacuum = OFF;", &mut db).expect("disable via PRAGMA");
4104            assert_eq!(db.auto_vacuum_threshold(), None);
4105
4106            let pages_before = db.pager.as_ref().unwrap().header().page_count;
4107            process_command("DROP TABLE bloat;", &mut db).expect("drop");
4108            let pages_after = db.pager.as_ref().unwrap().header().page_count;
4109            assert_eq!(
4110                pages_after, pages_before,
4111                "PRAGMA-driven OFF must keep page_count at the HWM"
4112            );
4113            cleanup(&path);
4114        }
4115
4116        // Sub-case B — high threshold via PRAGMA suppresses the
4117        // trigger on a single drop.
4118        {
4119            let path = tmp_path("av_pragma_high");
4120            let mut db = auto_vacuum_setup(&path);
4121            process_command("PRAGMA auto_vacuum = 0.99;", &mut db).expect("set high");
4122            assert_eq!(db.auto_vacuum_threshold(), Some(0.99));
4123
4124            let pages_before = db.pager.as_ref().unwrap().header().page_count;
4125            process_command("DROP TABLE bloat;", &mut db).expect("drop");
4126            let pages_after = db.pager.as_ref().unwrap().header().page_count;
4127            assert_eq!(
4128                pages_after, pages_before,
4129                "high PRAGMA threshold must suppress the trigger"
4130            );
4131            cleanup(&path);
4132        }
4133
4134        // Sub-case C — re-arm via PRAGMA after disable: the trigger
4135        // fires again on the next page-releasing DDL.
4136        {
4137            let path = tmp_path("av_pragma_rearm");
4138            let mut db = auto_vacuum_setup(&path);
4139            process_command("PRAGMA auto_vacuum = OFF;", &mut db).unwrap();
4140            // Drop with the trigger off — pages land on the freelist
4141            // but the file stays at HWM.
4142            process_command("DROP TABLE bloat;", &mut db).unwrap();
4143            let pages_after_off_drop = db.pager.as_ref().unwrap().header().page_count;
4144            assert!(db.pager.as_ref().unwrap().header().freelist_head != 0);
4145
4146            // Re-arm via PRAGMA, then drop one more thing — the
4147            // accumulated freelist still exceeds 25%, so auto-VACUUM
4148            // fires.
4149            process_command("PRAGMA auto_vacuum = 0.25;", &mut db).expect("re-arm");
4150            process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).unwrap();
4151            process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
4152
4153            let pages_after_rearm = db.pager.as_ref().unwrap().header().page_count;
4154            assert!(
4155                pages_after_rearm < pages_after_off_drop,
4156                "re-armed PRAGMA must let auto-VACUUM fire: was {pages_after_off_drop}, \
4157                 now {pages_after_rearm}"
4158            );
4159            assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
4160            cleanup(&path);
4161        }
4162    }
4163
4164    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
4165    /// with NotImplemented — only bare `VACUUM;` is supported.
4166    #[test]
4167    fn vacuum_modifiers_are_rejected() {
4168        let path = tmp_path("vacuum_modifiers");
4169        let mut db = seed_db();
4170        db.source_path = Some(path.clone());
4171        save_database(&mut db, &path).expect("save");
4172        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
4173            let err = process_command(stmt, &mut db).unwrap_err();
4174            assert!(
4175                format!("{err}").contains("VACUUM modifiers"),
4176                "expected modifier rejection for `{stmt}`, got: {err}"
4177            );
4178        }
4179        cleanup(&path);
4180    }
4181}