Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod allocator;
34#[allow(dead_code)]
35pub mod cell;
36pub mod file;
37#[allow(dead_code)]
38pub mod freelist;
39#[allow(dead_code)]
40pub mod fts_cell;
41pub mod header;
42#[allow(dead_code)]
43pub mod hnsw_cell;
44#[allow(dead_code)]
45pub mod index_cell;
46#[allow(dead_code)]
47pub mod interior_page;
48pub mod overflow;
49pub mod page;
50pub mod pager;
51#[allow(dead_code)]
52pub mod table_page;
53#[allow(dead_code)]
54pub mod varint;
55#[allow(dead_code)]
56pub mod wal;
57
58use std::collections::{BTreeMap, HashMap};
59use std::path::Path;
60use std::sync::{Arc, Mutex};
61
62use crate::sql::dialect::SqlriteDialect;
63use sqlparser::parser::Parser;
64
65use crate::error::{Result, SQLRiteError};
66use crate::sql::db::database::Database;
67use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69use crate::sql::hnsw::DistanceMetric;
70use crate::sql::pager::cell::Cell;
71use crate::sql::pager::header::DbHeader;
72use crate::sql::pager::index_cell::IndexCell;
73use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
74use crate::sql::pager::overflow::{
75    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
76};
77use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
78use crate::sql::pager::pager::Pager;
79use crate::sql::pager::table_page::TablePage;
80use crate::sql::parser::create::CreateQuery;
81
82// Re-export so callers can spell `sql::pager::AccessMode` without
83// reaching into the `pager::pager::pager` submodule path.
84pub use crate::sql::pager::pager::AccessMode;
85
86/// Name of the internal catalog table. Reserved — user CREATEs of this
87/// name must be rejected upstream.
88pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
89
90/// Opens a database file in read-write mode. Shorthand for
91/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
92pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
93    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
94}
95
96/// Opens a database file in read-only mode. Acquires a shared OS-level
97/// advisory lock, so other read-only openers coexist but any writer is
98/// excluded. Attempts to mutate the returned `Database` (e.g. an
99/// `INSERT`, or a `save_database` call against it) bottom out in a
100/// `cannot commit: database is opened read-only` error from the Pager.
101pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
102    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
103}
104
105/// Opens a database file and reconstructs the in-memory `Database`,
106/// leaving the long-lived `Pager` attached for subsequent auto-save
107/// (read-write) or consistent-snapshot reads (read-only).
108pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
109    let pager = Pager::open_with_mode(path, mode)?;
110
111    // 1. Load sqlrite_master from the tree at header.schema_root_page.
112    let mut master = build_empty_master_table();
113    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
114
115    // 2. Two passes over master rows: first build every user table, then
116    //    attach secondary indexes. Indexes need their base table to exist
117    //    before we can populate them. Auto-indexes are created at table
118    //    build time so we only have to load explicit indexes from disk
119    //    (but we also reload the auto-index CONTENT because Table::new
120    //    built it empty).
121    let mut db = Database::new(db_name);
122    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
123
124    for rowid in master.rowids() {
125        let ty = take_text(&master, "type", rowid)?;
126        let name = take_text(&master, "name", rowid)?;
127        let sql = take_text(&master, "sql", rowid)?;
128        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
129        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
130
131        match ty.as_str() {
132            "table" => {
133                let (parsed_name, columns) = parse_create_sql(&sql)?;
134                if parsed_name != name {
135                    return Err(SQLRiteError::Internal(format!(
136                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
137                    )));
138                }
139                let mut table = build_empty_table(&name, columns, last_rowid);
140                if rootpage != 0 {
141                    load_table_rows(&pager, &mut table, rootpage)?;
142                }
143                if last_rowid > table.last_rowid {
144                    table.last_rowid = last_rowid;
145                }
146                db.tables.insert(name, table);
147            }
148            "index" => {
149                index_rows.push(IndexCatalogRow {
150                    name,
151                    sql,
152                    rootpage,
153                });
154            }
155            other => {
156                return Err(SQLRiteError::Internal(format!(
157                    "sqlrite_master row '{name}' has unknown type '{other}'"
158                )));
159            }
160        }
161    }
162
163    // Second pass: attach each index to its table. HNSW indexes
164    // (Phase 7d.2) take a different code path because their persisted
165    // form is just the CREATE INDEX SQL — the graph itself isn't
166    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
167    // and route to a graph-rebuild instead of the B-Tree-cell load.
168    //
169    // Phase 8b — same shape for FTS indexes. The posting lists aren't
170    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
171    // open and let `execute_create_index` walk current rows.
172    for row in index_rows {
173        if create_index_sql_uses_hnsw(&row.sql) {
174            rebuild_hnsw_index(&mut db, &pager, &row)?;
175        } else if create_index_sql_uses_fts(&row.sql) {
176            rebuild_fts_index(&mut db, &pager, &row)?;
177        } else {
178            attach_index(&mut db, &pager, row)?;
179        }
180    }
181
182    db.source_path = Some(path.to_path_buf());
183    db.pager = Some(pager);
184    Ok(db)
185}
186
187/// Catalog row for a secondary index — deferred until after every table is
188/// loaded so the index's base table exists by the time we populate it.
189struct IndexCatalogRow {
190    name: String,
191    sql: String,
192    rootpage: u32,
193}
194
195/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
196/// haven't changed; the [`PageAllocator`] preserves per-table page
197/// numbers across saves so unchanged tables produce zero dirty frames.
198///
199/// Pages that were live before this save but aren't restaged this round
200/// (e.g., the leaves of a dropped table) move onto a persisted free
201/// list rooted at `header.freelist_head`; subsequent saves draw from
202/// the freelist before extending the file. `VACUUM` (see
203/// [`vacuum_database`]) compacts the file by ignoring the freelist and
204/// allocating linearly from page 1.
205///
206/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
207pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
208    save_database_with_mode(db, path, /*compact=*/ false)
209}
210
211/// Reclaims space by rewriting every live B-Tree contiguously from
212/// page 1, with no freelist. Equivalent to `save_database` but ignores
213/// the existing freelist and per-table preferred pools — every page is
214/// allocated by extending the high-water mark — so the resulting file
215/// is tightly packed and the freelist is empty.
216///
217/// Used by the SQL-level `VACUUM;` statement.
218pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
219    save_database_with_mode(db, path, /*compact=*/ true)
220}
221
222/// Shared save core. `compact = false` is the normal save path (uses
223/// the existing freelist + per-table preferred pools). `compact = true`
224/// is the VACUUM path (empty freelist, empty preferred pools, linear
225/// allocation from page 1).
226fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
227    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
228    // marked dirty. Done up front under the &mut Database borrow we
229    // already hold, before the immutable iteration loops below need
230    // their own borrow.
231    rebuild_dirty_hnsw_indexes(db);
232    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
233    rebuild_dirty_fts_indexes(db);
234
235    let same_path = db.source_path.as_deref() == Some(path);
236    let mut pager = if same_path {
237        match db.pager.take() {
238            Some(p) => p,
239            None if path.exists() => Pager::open(path)?,
240            None => Pager::create(path)?,
241        }
242    } else if path.exists() {
243        Pager::open(path)?
244    } else {
245        Pager::create(path)?
246    };
247
248    // Snapshot what was live BEFORE we reset staged. Used to compute the
249    // newly-freed set after staging completes. Page 0 (the header) is
250    // never on the freelist — it's always live.
251    let old_header = pager.header();
252    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
253
254    // Read the previously-persisted freelist so its leaf pages can be
255    // reused as preferred allocations and its trunk pages don't leak.
256    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
257        (Vec::new(), Vec::new())
258    } else {
259        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
260    };
261
262    // Snapshot the previous rootpages of each table/index so we can
263    // seed per-table preferred pools (the unchanged-table case stages
264    // byte-identical pages → diff pager skips every write for it).
265    let old_rootpages = if compact {
266        HashMap::new()
267    } else {
268        read_old_rootpages(&pager, old_header.schema_root_page)?
269    };
270
271    // SQLR-1 — snapshot every prior B-Tree's page set NOW, before any
272    // staging starts. `Pager::read_page` shadows on-disk bytes with the
273    // current `staged` buffer, so if we deferred these walks until each
274    // object's turn in the staging loop, a *new* index added in this
275    // save would extend past the old high-water and overwrite the
276    // pages of any later-staged object whose old root sits in that
277    // range — including `sqlrite_master`, which is always staged last.
278    // The follow-up walk would then read the wrong B-Tree's bytes and
279    // either hand the allocator a bogus preferred pool or panic
280    // dispatching cells (a table-cell decoder vs. an index leaf, the
281    // shape of the original SQLR-1 panic). Walking up front pins each
282    // map to the committed bytes that were on disk before this save
283    // touched anything.
284    let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
285        HashMap::new()
286    } else {
287        let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
288        for ((kind, name), &root) in &old_rootpages {
289            // Tables can carry overflow chains; index/HNSW/FTS leaves
290            // never overflow in the current encoding, so the cheaper
291            // walk suffices for them.
292            let follow = kind == "table";
293            let pages = collect_pages_for_btree(&pager, root, follow)?;
294            map.insert((kind.clone(), name.clone()), pages);
295        }
296        map
297    };
298    let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
299        Vec::new()
300    } else {
301        collect_pages_for_btree(
302            &pager,
303            old_header.schema_root_page,
304            /*follow_overflow=*/ true,
305        )?
306    };
307
308    pager.clear_staged();
309
310    // Allocator: in normal mode, seed with the old freelist; in compact
311    // mode, start empty so allocation extends linearly from page 1.
312    use std::collections::VecDeque;
313    let initial_freelist: VecDeque<u32> = if compact {
314        VecDeque::new()
315    } else {
316        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
317    };
318    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
319
320    // 1. Stage each user table's B-Tree, collecting master-row info.
321    //    `kind` is "table" or "index" — master has one row per each.
322    let mut master_rows: Vec<CatalogEntry> = Vec::new();
323
324    let mut table_names: Vec<&String> = db.tables.keys().collect();
325    table_names.sort();
326    for name in table_names {
327        if name == MASTER_TABLE_NAME {
328            return Err(SQLRiteError::Internal(format!(
329                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
330            )));
331        }
332        if !compact {
333            if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
334                alloc.set_preferred(prev.clone());
335            }
336        }
337        let table = &db.tables[name];
338        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
339        alloc.finish_preferred();
340        master_rows.push(CatalogEntry {
341            kind: "table".into(),
342            name: name.clone(),
343            sql: table_to_create_sql(table),
344            rootpage,
345            last_rowid: table.last_rowid,
346        });
347    }
348
349    // 2. Stage each secondary index's B-Tree. Indexes persist in a
350    //    deterministic order: sorted by (owning_table, index_name).
351    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
352    for table in db.tables.values() {
353        for idx in &table.secondary_indexes {
354            index_entries.push((table, idx));
355        }
356    }
357    index_entries
358        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
359    for (_table, idx) in index_entries {
360        if !compact {
361            if let Some(prev) =
362                old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
363            {
364                alloc.set_preferred(prev.clone());
365            }
366        }
367        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
368        alloc.finish_preferred();
369        master_rows.push(CatalogEntry {
370            kind: "index".into(),
371            name: idx.name.clone(),
372            sql: idx.synthesized_sql(),
373            rootpage,
374            last_rowid: 0,
375        });
376    }
377
378    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
379    //     page trees, with the rootpage recorded in sqlrite_master.
380    //     Reopen loads the graph back from cells (fast, exact match)
381    //     instead of rebuilding from rows.
382    //
383    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
384    //     rebuilt from current rows BEFORE staging, so the on-disk
385    //     graph reflects the current row set.
386    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
387    for table in db.tables.values() {
388        for entry in &table.hnsw_indexes {
389            hnsw_entries.push((table, entry));
390        }
391    }
392    hnsw_entries
393        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
394    for (table, entry) in hnsw_entries {
395        if !compact {
396            if let Some(prev) =
397                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
398            {
399                alloc.set_preferred(prev.clone());
400            }
401        }
402        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
403        alloc.finish_preferred();
404        master_rows.push(CatalogEntry {
405            kind: "index".into(),
406            name: entry.name.clone(),
407            sql: synthesize_hnsw_create_index_sql(
408                &entry.name,
409                &table.tb_name,
410                &entry.column_name,
411                entry.metric,
412            ),
413            rootpage,
414            last_rowid: 0,
415        });
416    }
417
418    // 2c. Phase 8c — persist FTS posting lists as their own
419    //     cell-encoded page trees, with the rootpage recorded in
420    //     sqlrite_master. Reopen loads the postings back from cells
421    //     (fast, exact match) instead of re-tokenizing rows.
422    //
423    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
424    //     rebuilt from current rows BEFORE staging by
425    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
426    //     the current row set.
427    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
428    for table in db.tables.values() {
429        for entry in &table.fts_indexes {
430            fts_entries.push((table, entry));
431        }
432    }
433    fts_entries
434        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
435    let any_fts = !fts_entries.is_empty();
436    for (table, entry) in fts_entries {
437        if !compact {
438            if let Some(prev) =
439                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
440            {
441                alloc.set_preferred(prev.clone());
442            }
443        }
444        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
445        alloc.finish_preferred();
446        master_rows.push(CatalogEntry {
447            kind: "index".into(),
448            name: entry.name.clone(),
449            sql: format!(
450                "CREATE INDEX {} ON {} USING fts ({})",
451                entry.name, table.tb_name, entry.column_name
452            ),
453            rootpage,
454            last_rowid: 0,
455        });
456    }
457
458    // 3. Build an in-memory sqlrite_master with one row per table or index,
459    //    then stage it via the same tree-build path. Seed master's
460    //    preferred pool with the previous master tree's pages so the
461    //    catalog page numbers stay stable across saves whenever the
462    //    catalog content didn't change.
463    let mut master = build_empty_master_table();
464    for (i, entry) in master_rows.into_iter().enumerate() {
465        let rowid = (i as i64) + 1;
466        master.restore_row(
467            rowid,
468            vec![
469                Some(Value::Text(entry.kind)),
470                Some(Value::Text(entry.name)),
471                Some(Value::Text(entry.sql)),
472                Some(Value::Integer(entry.rootpage as i64)),
473                Some(Value::Integer(entry.last_rowid)),
474            ],
475        )?;
476    }
477    if !compact && !old_master_pages.is_empty() {
478        // Use the page list snapshotted before any staging touched
479        // disk; re-walking here would read whatever a new index
480        // already restaged on top of master's old root (SQLR-1).
481        alloc.set_preferred(old_master_pages.clone());
482    }
483    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
484    alloc.finish_preferred();
485
486    // 4. Compute newly-freed pages: the previously-live set minus what
487    //    we just restaged. The previous freelist's trunk pages get
488    //    re-encoded too — they're in `old_live`, weren't restaged, so
489    //    the filter naturally moves them to the new freelist.
490    //
491    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
492    // routing it onto the new freelist. The whole point of VACUUM is
493    // to let the file truncate to the new high-water mark, so any page
494    // past it gets dropped at the next checkpoint.
495    if !compact {
496        let used = alloc.used().clone();
497        let mut newly_freed: Vec<u32> = old_live
498            .iter()
499            .copied()
500            .filter(|p| !used.contains(p))
501            .collect();
502        let _ = &old_free_trunks; // silenced — handled by the old_live filter
503        alloc.add_to_freelist(newly_freed.drain(..));
504    }
505
506    // 5. Encode the new freelist into trunk pages. `stage_freelist`
507    //    consumes some of the free pages AS the trunk pages themselves —
508    //    a trunk is just a free page borrowed for metadata. Pages that
509    //    were on the freelist but become trunks no longer need to be
510    //    "extension" pages; the high-water mark from the staging loop
511    //    above is already correct.
512    let new_free_pages = alloc.drain_freelist();
513    let new_freelist_head =
514        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
515
516    // 6. Pick the format version. v6 is on demand: only bumps when the
517    //    new freelist is non-empty. FTS-bearing files keep their v5
518    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
519    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
520    let format_version = if new_freelist_head != 0 {
521        FORMAT_VERSION_V6
522    } else if any_fts {
523        // Preserve a v6 file at v6 (don't downgrade) but otherwise
524        // bump v4 → v5 for FTS like Phase 8c does.
525        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
526    } else {
527        // Preserve whatever the file already was.
528        old_header.format_version
529    };
530
531    pager.commit(DbHeader {
532        page_count: alloc.high_water(),
533        schema_root_page: master_root,
534        format_version,
535        freelist_head: new_freelist_head,
536    })?;
537
538    if same_path {
539        db.pager = Some(pager);
540    }
541    Ok(())
542}
543
544/// Build material for a single row in sqlrite_master.
545struct CatalogEntry {
546    kind: String, // "table" or "index"
547    name: String,
548    sql: String,
549    rootpage: u32,
550    last_rowid: i64,
551}
552
553// -------------------------------------------------------------------------
554// sqlrite_master — hardcoded catalog table schema
555
556fn build_empty_master_table() -> Table {
557    // Phase 3e: `type` is the first column, matching SQLite's convention.
558    // It distinguishes `'table'` rows from `'index'` rows.
559    let columns = vec![
560        Column::new("type".into(), "text".into(), false, true, false),
561        Column::new("name".into(), "text".into(), true, true, true),
562        Column::new("sql".into(), "text".into(), false, true, false),
563        Column::new("rootpage".into(), "integer".into(), false, true, false),
564        Column::new("last_rowid".into(), "integer".into(), false, true, false),
565    ];
566    build_empty_table(MASTER_TABLE_NAME, columns, 0)
567}
568
569/// Reads a required Text column from a known-good catalog row.
570fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
571    match table.get_value(col, rowid) {
572        Some(Value::Text(s)) => Ok(s),
573        other => Err(SQLRiteError::Internal(format!(
574            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
575        ))),
576    }
577}
578
579/// Reads a required Integer column from a known-good catalog row.
580fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
581    match table.get_value(col, rowid) {
582        Some(Value::Integer(v)) => Ok(v),
583        other => Err(SQLRiteError::Internal(format!(
584            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
585        ))),
586    }
587}
588
589// -------------------------------------------------------------------------
590// CREATE-TABLE SQL synthesis and re-parsing
591
592/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
593/// Deterministic: same schema → same SQL, so diffing commits stay stable.
594fn table_to_create_sql(table: &Table) -> String {
595    let mut parts = Vec::with_capacity(table.columns.len());
596    for c in &table.columns {
597        // Render the SQL type literally so the round-trip through
598        // CREATE TABLE re-parsing recreates the same schema. Vector
599        // carries its dimension inline.
600        let ty: String = match &c.datatype {
601            DataType::Integer => "INTEGER".to_string(),
602            DataType::Text => "TEXT".to_string(),
603            DataType::Real => "REAL".to_string(),
604            DataType::Bool => "BOOLEAN".to_string(),
605            DataType::Vector(dim) => format!("VECTOR({dim})"),
606            DataType::Json => "JSON".to_string(),
607            DataType::None | DataType::Invalid => "TEXT".to_string(),
608        };
609        let mut piece = format!("{} {}", c.column_name, ty);
610        if c.is_pk {
611            piece.push_str(" PRIMARY KEY");
612        } else {
613            if c.is_unique {
614                piece.push_str(" UNIQUE");
615            }
616            if c.not_null {
617                piece.push_str(" NOT NULL");
618            }
619        }
620        if let Some(default) = &c.default {
621            piece.push_str(" DEFAULT ");
622            piece.push_str(&render_default_literal(default));
623        }
624        parts.push(piece);
625    }
626    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
627}
628
629/// Renders a DEFAULT value back to SQL-literal form so the synthesized
630/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
631/// single-quoted with single-quote doubling for escaping. Vector defaults
632/// are not currently expressible at CREATE TABLE time, so we render them
633/// as their bracket-array form (matches the INSERT literal grammar).
634fn render_default_literal(value: &Value) -> String {
635    match value {
636        Value::Integer(i) => i.to_string(),
637        Value::Real(f) => f.to_string(),
638        Value::Bool(b) => {
639            if *b {
640                "TRUE".to_string()
641            } else {
642                "FALSE".to_string()
643            }
644        }
645        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
646        Value::Null => "NULL".to_string(),
647        Value::Vector(_) => value.to_display_string(),
648    }
649}
650
651/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
652/// and produces our internal column list. Returns `(table_name, columns)`.
653fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
654    let dialect = SqlriteDialect::new();
655    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
656    let stmt = ast.pop().ok_or_else(|| {
657        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
658    })?;
659    let create = CreateQuery::new(&stmt)?;
660    let columns = create
661        .columns
662        .into_iter()
663        .map(|pc| {
664            Column::with_default(
665                pc.name,
666                pc.datatype,
667                pc.is_pk,
668                pc.not_null,
669                pc.is_unique,
670                pc.default,
671            )
672        })
673        .collect();
674    Ok((create.table_name, columns))
675}
676
677// -------------------------------------------------------------------------
678// In-memory table (re)construction
679
680/// Builds an empty in-memory `Table` given the declared columns.
681fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
682    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
683    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
684    {
685        let mut map = rows.lock().expect("rows mutex poisoned");
686        for col in &columns {
687            // Mirror the dispatch in `Table::new` so the reconstructed
688            // table has the same shape it'd have if it were built fresh
689            // from SQL. Phase 7a adds the Vector arm — without it,
690            // VECTOR columns silently restore as Row::None and every
691            // restore_row hits a "storage None vs value Some(Vector(...))"
692            // type mismatch.
693            let row = match &col.datatype {
694                DataType::Integer => Row::Integer(BTreeMap::new()),
695                DataType::Text => Row::Text(BTreeMap::new()),
696                DataType::Real => Row::Real(BTreeMap::new()),
697                DataType::Bool => Row::Bool(BTreeMap::new()),
698                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
699                // JSON columns reuse Text storage — see Table::new and
700                // Phase 7e's scope-correction note.
701                DataType::Json => Row::Text(BTreeMap::new()),
702                DataType::None | DataType::Invalid => Row::None,
703            };
704            map.insert(col.column_name.clone(), row);
705
706            // Auto-create UNIQUE/PK indexes so the restored table has the
707            // same shape Table::new would have built from fresh SQL.
708            if (col.is_pk || col.is_unique)
709                && matches!(col.datatype, DataType::Integer | DataType::Text)
710            {
711                if let Ok(idx) = SecondaryIndex::new(
712                    SecondaryIndex::auto_name(name, &col.column_name),
713                    name.to_string(),
714                    col.column_name.clone(),
715                    &col.datatype,
716                    true,
717                    IndexOrigin::Auto,
718                ) {
719                    secondary_indexes.push(idx);
720                }
721            }
722        }
723    }
724
725    let primary_key = columns
726        .iter()
727        .find(|c| c.is_pk)
728        .map(|c| c.column_name.clone())
729        .unwrap_or_else(|| "-1".to_string());
730
731    Table {
732        tb_name: name.to_string(),
733        columns,
734        rows,
735        secondary_indexes,
736        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
737        // executing each `CREATE INDEX … USING hnsw` SQL stored in
738        // `sqlrite_master`. This builder produces the empty shell;
739        // `replay_create_index_for_hnsw` (in this same module) walks
740        // sqlrite_master after every table is loaded and rebuilds the
741        // graph from current row data. Persistence of the graph itself
742        // (avoiding the on-open rebuild cost) is Phase 7d.3.
743        hnsw_indexes: Vec::new(),
744        // FTS indexes (Phase 8b) follow the same pattern — the
745        // CREATE INDEX … USING fts SQL is the source of truth on open
746        // and the in-memory posting list gets rebuilt from current
747        // rows. Cell-encoded persistence of the postings is Phase 8c.
748        fts_indexes: Vec::new(),
749        last_rowid,
750        primary_key,
751    }
752}
753
754// -------------------------------------------------------------------------
755// Leaf-chain read / write
756
757/// Walks a table's B-Tree from `root_page`, following the leftmost-child
758/// chain down to the first leaf, then iterating leaves via their sibling
759/// `next_page` pointers. Every cell is decoded and replayed into `table`.
760///
761/// Open-path note: we eagerly materialize the entire table into `Table`'s
762/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
763/// on demand so queries can stream through the tree without a full upfront
764/// load.
765/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
766/// index on its base table by walking the tree of index cells at
767/// `rootpage`. The base table is expected to already be in `db.tables`.
768fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
769    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
770
771    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
772        SQLRiteError::Internal(format!(
773            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
774            row.name
775        ))
776    })?;
777    let datatype = table
778        .columns
779        .iter()
780        .find(|c| c.column_name == column_name)
781        .map(|c| clone_datatype(&c.datatype))
782        .ok_or_else(|| {
783            SQLRiteError::Internal(format!(
784                "index '{}' references unknown column '{column_name}' on '{table_name}'",
785                row.name
786            ))
787        })?;
788
789    // An auto-index on this column may already exist (built by
790    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
791    // the slot instead of adding a duplicate entry.
792    let existing_slot = table
793        .secondary_indexes
794        .iter()
795        .position(|i| i.name == row.name);
796    let idx = match existing_slot {
797        Some(i) => {
798            // Drain any entries that may have been populated during table
799            // restore_row calls — we're about to repopulate from the
800            // persisted tree.
801            table.secondary_indexes.remove(i)
802        }
803        None => SecondaryIndex::new(
804            row.name.clone(),
805            table_name.clone(),
806            column_name.clone(),
807            &datatype,
808            is_unique,
809            IndexOrigin::Explicit,
810        )?,
811    };
812    let mut idx = idx;
813    // Wipe any stale entries from the auto path so the load is idempotent.
814    let is_unique_flag = idx.is_unique;
815    let origin = idx.origin;
816    idx = SecondaryIndex::new(
817        idx.name,
818        idx.table_name,
819        idx.column_name,
820        &datatype,
821        is_unique_flag,
822        origin,
823    )?;
824
825    // Populate from the index tree's cells.
826    load_index_rows(pager, &mut idx, row.rootpage)?;
827
828    table.secondary_indexes.push(idx);
829    Ok(())
830}
831
832/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
833/// every `(value, rowid)` pair into `idx`.
834fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
835    if root_page == 0 {
836        return Ok(());
837    }
838    let first_leaf = find_leftmost_leaf(pager, root_page)?;
839    let mut current = first_leaf;
840    while current != 0 {
841        let page_buf = pager
842            .read_page(current)
843            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
844        if page_buf[0] != PageType::TableLeaf as u8 {
845            return Err(SQLRiteError::Internal(format!(
846                "page {current} tagged {} but expected TableLeaf (index)",
847                page_buf[0]
848            )));
849        }
850        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
851        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
852            .try_into()
853            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
854        let leaf = TablePage::from_bytes(payload);
855
856        for slot in 0..leaf.slot_count() {
857            // Slots on an index page hold KIND_INDEX cells; decode directly.
858            let offset = leaf.slot_offset_raw(slot)?;
859            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
860            idx.insert(&ic.value, ic.rowid)?;
861        }
862        current = next_leaf;
863    }
864    Ok(())
865}
866
867/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
868/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
869///
870/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
871/// still works; the only shape we accept is single-column indexes.
872fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
873    use sqlparser::ast::{CreateIndex, Expr, Statement};
874
875    let dialect = SqlriteDialect::new();
876    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
877    let Some(Statement::CreateIndex(CreateIndex {
878        table_name,
879        columns,
880        unique,
881        ..
882    })) = ast.pop()
883    else {
884        return Err(SQLRiteError::Internal(format!(
885            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
886        )));
887    };
888    if columns.len() != 1 {
889        return Err(SQLRiteError::NotImplemented(
890            "multi-column indexes aren't supported yet".to_string(),
891        ));
892    }
893    let col = match &columns[0].column.expr {
894        Expr::Identifier(ident) => ident.value.clone(),
895        Expr::CompoundIdentifier(parts) => {
896            parts.last().map(|p| p.value.clone()).unwrap_or_default()
897        }
898        other => {
899            return Err(SQLRiteError::Internal(format!(
900                "unsupported indexed column expression: {other:?}"
901            )));
902        }
903    };
904    Ok((table_name.to_string(), col, unique))
905}
906
907/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
908/// Used by the open path to route HNSW indexes to the graph-rebuild path
909/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
910/// don't have a USING clause, so they all return false and continue
911/// taking the existing path.
912fn create_index_sql_uses_hnsw(sql: &str) -> bool {
913    use sqlparser::ast::{CreateIndex, IndexType, Statement};
914
915    let dialect = SqlriteDialect::new();
916    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
917        return false;
918    };
919    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
920        return false;
921    };
922    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
923}
924
925/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
926/// Mirrors [`create_index_sql_uses_hnsw`].
927fn create_index_sql_uses_fts(sql: &str) -> bool {
928    use sqlparser::ast::{CreateIndex, IndexType, Statement};
929
930    let dialect = SqlriteDialect::new();
931    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
932        return false;
933    };
934    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
935        return false;
936    };
937    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
938}
939
940/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
941/// paths mirror [`rebuild_hnsw_index`]:
942///
943///   - **rootpage != 0** (Phase 8c default): the posting list is
944///     persisted as cell-encoded pages. Read every cell directly via
945///     [`load_fts_postings`] and reconstruct the index — no
946///     re-tokenization, exact bit-for-bit reproduction.
947///
948///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
949///     for files saved by Phase 8b before persistence landed. Replay
950///     the CREATE INDEX SQL through `execute_create_index`, which
951///     walks the table's current rows and tokenizes them fresh.
952fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
953    use crate::sql::db::table::FtsIndexEntry;
954    use crate::sql::executor::execute_create_index;
955    use crate::sql::fts::PostingList;
956    use sqlparser::ast::Statement;
957
958    let dialect = SqlriteDialect::new();
959    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
960    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
961        return Err(SQLRiteError::Internal(format!(
962            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
963            row.sql
964        )));
965    };
966
967    if row.rootpage == 0 {
968        // Compatibility path — no persisted postings; replay rows.
969        execute_create_index(&stmt, db)?;
970        return Ok(());
971    }
972
973    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
974    let index = PostingList::from_persisted_postings(doc_lengths, postings);
975    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
976    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
977        SQLRiteError::Internal(format!(
978            "FTS index '{}' references unknown table '{tbl_name}'",
979            row.name
980        ))
981    })?;
982    table_mut.fts_indexes.push(FtsIndexEntry {
983        name: row.name.clone(),
984        column_name: col_name,
985        index,
986        needs_rebuild: false,
987    });
988    Ok(())
989}
990
991/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
992/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
993fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
994    use sqlparser::ast::{CreateIndex, Expr, Statement};
995
996    let dialect = SqlriteDialect::new();
997    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
998    let Some(Statement::CreateIndex(CreateIndex {
999        table_name,
1000        columns,
1001        ..
1002    })) = ast.pop()
1003    else {
1004        return Err(SQLRiteError::Internal(format!(
1005            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
1006        )));
1007    };
1008    if columns.len() != 1 {
1009        return Err(SQLRiteError::NotImplemented(
1010            "multi-column FTS indexes aren't supported yet".to_string(),
1011        ));
1012    }
1013    let col = match &columns[0].column.expr {
1014        Expr::Identifier(ident) => ident.value.clone(),
1015        Expr::CompoundIdentifier(parts) => {
1016            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1017        }
1018        other => {
1019            return Err(SQLRiteError::Internal(format!(
1020                "FTS CREATE INDEX has unexpected column expr: {other:?}"
1021            )));
1022        }
1023    };
1024    Ok((table_name.to_string(), col))
1025}
1026
1027/// Loads (or rebuilds) an HNSW index on database open. Two paths:
1028///
1029///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1030///     as cell-encoded pages. Read every node directly via
1031///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1032///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1033///
1034///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1035///     files saved by Phase 7d.2 before persistence landed. Replay the
1036///     CREATE INDEX SQL through `execute_create_index`, which walks the
1037///     table's current rows and populates a fresh graph. Slower but
1038///     correctness-equivalent on the first save with the new code.
1039fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1040    use crate::sql::db::table::HnswIndexEntry;
1041    use crate::sql::executor::execute_create_index;
1042    use crate::sql::hnsw::HnswIndex;
1043    use sqlparser::ast::Statement;
1044
1045    let dialect = SqlriteDialect::new();
1046    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1047    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
1048        return Err(SQLRiteError::Internal(format!(
1049            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1050            row.sql
1051        )));
1052    };
1053
1054    if row.rootpage == 0 {
1055        // Compatibility path — no persisted graph; walk current rows.
1056        execute_create_index(&stmt, db)?;
1057        return Ok(());
1058    }
1059
1060    // Persistence path — read the cell tree, deserialize. The metric
1061    // travels through the synthesized CREATE INDEX SQL stored in
1062    // `sqlrite_master`; pre-SQLR-28 rows omit the WITH clause and
1063    // decode as L2, which matches what those graphs were built with.
1064    let (tbl_name, col_name, metric) = parse_hnsw_create_index_sql(&row.sql)?;
1065    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
1066    let index = HnswIndex::from_persisted_nodes(metric, 0xC0FFEE, nodes);
1067
1068    // Parse the CREATE INDEX to know which table + column to attach to
1069    // — same shape as the row-walk path; we just don't execute it.
1070    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
1071        SQLRiteError::Internal(format!(
1072            "HNSW index '{}' references unknown table '{tbl_name}'",
1073            row.name
1074        ))
1075    })?;
1076    table_mut.hnsw_indexes.push(HnswIndexEntry {
1077        name: row.name.clone(),
1078        column_name: col_name,
1079        metric,
1080        index,
1081        needs_rebuild: false,
1082    });
1083    Ok(())
1084}
1085
1086/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1087/// page tree at `root_page` and decode each cell as a node. Returns
1088/// the (node_id, layers) tuples in slot-order (already ascending by
1089/// node_id since they were staged that way). The caller hands them to
1090/// `HnswIndex::from_persisted_nodes`.
1091fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1092    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1093
1094    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1095    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1096    let mut current = first_leaf;
1097    while current != 0 {
1098        let page_buf = pager
1099            .read_page(current)
1100            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1101        if page_buf[0] != PageType::TableLeaf as u8 {
1102            return Err(SQLRiteError::Internal(format!(
1103                "page {current} tagged {} but expected TableLeaf (HNSW)",
1104                page_buf[0]
1105            )));
1106        }
1107        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1108        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1109            .try_into()
1110            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1111        let leaf = TablePage::from_bytes(payload);
1112        for slot in 0..leaf.slot_count() {
1113            let offset = leaf.slot_offset_raw(slot)?;
1114            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1115            nodes.push((cell.node_id, cell.layers));
1116        }
1117        current = next_leaf;
1118    }
1119    Ok(nodes)
1120}
1121
1122/// Pulls `(table_name, column_name, metric)` out of a CREATE INDEX
1123/// SQL string of the form `CREATE INDEX … USING hnsw (col) [WITH
1124/// (metric = '<m>')]`. Used by the persistence path on open to know
1125/// where to attach the loaded graph and which distance metric to
1126/// rebuild it under. Pre-SQLR-28 rows omit the WITH clause and
1127/// default to L2.
1128fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String, DistanceMetric)> {
1129    use crate::sql::hnsw::DistanceMetric;
1130    use sqlparser::ast::{BinaryOperator, CreateIndex, Expr, Statement, Value as AstValue};
1131
1132    let dialect = SqlriteDialect::new();
1133    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1134    let Some(Statement::CreateIndex(CreateIndex {
1135        table_name,
1136        columns,
1137        with,
1138        ..
1139    })) = ast.pop()
1140    else {
1141        return Err(SQLRiteError::Internal(format!(
1142            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1143        )));
1144    };
1145    if columns.len() != 1 {
1146        return Err(SQLRiteError::NotImplemented(
1147            "multi-column HNSW indexes aren't supported yet".to_string(),
1148        ));
1149    }
1150    let col = match &columns[0].column.expr {
1151        Expr::Identifier(ident) => ident.value.clone(),
1152        Expr::CompoundIdentifier(parts) => {
1153            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1154        }
1155        other => {
1156            return Err(SQLRiteError::Internal(format!(
1157                "unsupported HNSW indexed column expression: {other:?}"
1158            )));
1159        }
1160    };
1161
1162    // Pull the metric off the parsed WITH (...) bag. The user-facing
1163    // CREATE INDEX path validates this in detail (typo'd metric names,
1164    // unknown keys, etc.); here on the persistence read-path we trust
1165    // what we previously wrote and surface a clean Internal error if
1166    // it ever doesn't match.
1167    let mut metric = DistanceMetric::L2;
1168    for opt in &with {
1169        if let Expr::BinaryOp { left, op, right } = opt {
1170            if matches!(op, BinaryOperator::Eq) {
1171                if let (Expr::Identifier(key), Expr::Value(v)) = (left.as_ref(), right.as_ref())
1172                    && key.value.eq_ignore_ascii_case("metric")
1173                {
1174                    if let AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) =
1175                        &v.value
1176                    {
1177                        metric = DistanceMetric::from_sql_name(s).ok_or_else(|| {
1178                            SQLRiteError::Internal(format!(
1179                                "sqlrite_master HNSW row carries unknown metric '{s}'"
1180                            ))
1181                        })?;
1182                    }
1183                }
1184            }
1185        }
1186    }
1187
1188    Ok((table_name.to_string(), col, metric))
1189}
1190
1191/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1192/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1193/// Walks the table's current Vec<f32> column storage and runs the
1194/// HNSW algorithm fresh. Called at the top of `save_database` before
1195/// any immutable borrows of `db` start.
1196///
1197/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1198/// small tables, expensive for ≥100k-row tables — matches the
1199/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1200/// MVP, more sophisticated incremental delete strategies (soft-delete
1201/// + tombstones, neighbor reconnection) are future polish.
1202fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
1203    use crate::sql::hnsw::HnswIndex;
1204
1205    for table in db.tables.values_mut() {
1206        // Snapshot which (index_name, column, metric) triples need
1207        // rebuilding, before we go grabbing column data — keeps the
1208        // borrow structure simple. The per-entry metric matters here:
1209        // rebuilding a cosine-built graph as L2 (or vice versa) would
1210        // silently corrupt the topology and break the SQLR-28 probe.
1211        let dirty: Vec<(String, String, DistanceMetric)> = table
1212            .hnsw_indexes
1213            .iter()
1214            .filter(|e| e.needs_rebuild)
1215            .map(|e| (e.name.clone(), e.column_name.clone(), e.metric))
1216            .collect();
1217        if dirty.is_empty() {
1218            continue;
1219        }
1220
1221        for (idx_name, col_name, metric) in dirty {
1222            // Snapshot every (rowid, vec) for this column.
1223            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
1224            {
1225                let row_data = table.rows.lock().expect("rows mutex poisoned");
1226                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
1227                    for (id, v) in map.iter() {
1228                        vectors.push((*id, v.clone()));
1229                    }
1230                }
1231            }
1232            // Pre-build a HashMap for the get_vec closure so we don't
1233            // pay O(N) lookup per insert call.
1234            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
1235                vectors.iter().cloned().collect();
1236
1237            let mut new_idx = HnswIndex::new(metric, 0xC0FFEE);
1238            // Sort by id so the rebuild is deterministic across runs.
1239            vectors.sort_by_key(|(id, _)| *id);
1240            for (id, v) in &vectors {
1241                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
1242            }
1243
1244            // Replace the entry's index + clear the dirty flag.
1245            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
1246                entry.index = new_idx;
1247                entry.needs_rebuild = false;
1248            }
1249        }
1250    }
1251}
1252
1253/// Synthesises the CREATE INDEX SQL stored back into `sqlrite_master`
1254/// for an HNSW index. The metric travels through the SQL via an
1255/// optional `WITH (metric = '<m>')` clause; L2 indexes omit the clause
1256/// for byte-identical round-trip with pre-SQLR-28 catalogs.
1257fn synthesize_hnsw_create_index_sql(
1258    index_name: &str,
1259    table_name: &str,
1260    column_name: &str,
1261    metric: DistanceMetric,
1262) -> String {
1263    if matches!(metric, DistanceMetric::L2) {
1264        format!("CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name})")
1265    } else {
1266        format!(
1267            "CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name}) WITH (metric = '{}')",
1268            metric.sql_name()
1269        )
1270    }
1271}
1272
1273/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1274/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1275/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1276/// is empty so the per-table loop short-circuits).
1277fn rebuild_dirty_fts_indexes(db: &mut Database) {
1278    use crate::sql::fts::PostingList;
1279
1280    for table in db.tables.values_mut() {
1281        let dirty: Vec<(String, String)> = table
1282            .fts_indexes
1283            .iter()
1284            .filter(|e| e.needs_rebuild)
1285            .map(|e| (e.name.clone(), e.column_name.clone()))
1286            .collect();
1287        if dirty.is_empty() {
1288            continue;
1289        }
1290
1291        for (idx_name, col_name) in dirty {
1292            // Snapshot every (rowid, text) pair for this column under
1293            // the row mutex, then drop the lock before re-tokenizing.
1294            let mut docs: Vec<(i64, String)> = Vec::new();
1295            {
1296                let row_data = table.rows.lock().expect("rows mutex poisoned");
1297                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1298                    for (id, v) in map.iter() {
1299                        // "Null" sentinel is the parser's
1300                        // null-marker for TEXT cells; skip those —
1301                        // they'd round-trip as the literal string
1302                        // "Null" otherwise. Aligns with insert_row's
1303                        // typed_value gate.
1304                        if v != "Null" {
1305                            docs.push((*id, v.clone()));
1306                        }
1307                    }
1308                }
1309            }
1310
1311            let mut new_idx = PostingList::new();
1312            // Sort by id so the rebuild is deterministic across runs
1313            // (the BTreeMap inside PostingList is order-stable, but
1314            // doc-length aggregation order doesn't matter — sorting
1315            // here is purely for reproducibility on inspection).
1316            docs.sort_by_key(|(id, _)| *id);
1317            for (id, text) in &docs {
1318                new_idx.insert(*id, text);
1319            }
1320
1321            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1322                entry.index = new_idx;
1323                entry.needs_rebuild = false;
1324            }
1325        }
1326    }
1327}
1328
1329/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1330fn clone_datatype(dt: &DataType) -> DataType {
1331    match dt {
1332        DataType::Integer => DataType::Integer,
1333        DataType::Text => DataType::Text,
1334        DataType::Real => DataType::Real,
1335        DataType::Bool => DataType::Bool,
1336        DataType::Vector(dim) => DataType::Vector(*dim),
1337        DataType::Json => DataType::Json,
1338        DataType::None => DataType::None,
1339        DataType::Invalid => DataType::Invalid,
1340    }
1341}
1342
1343/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1344/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1345/// `(root_page, next_free_page)`.
1346///
1347/// The tree's shape matches a regular table's — leaves chained via
1348/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1349/// uniformly for index cells (same prefix as local cells), so the
1350/// existing slot directory and binary search carry over.
1351fn stage_index_btree(
1352    pager: &mut Pager,
1353    idx: &SecondaryIndex,
1354    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1355) -> Result<u32> {
1356    // Build the leaves.
1357    let leaves = stage_index_leaves(pager, idx, alloc)?;
1358    if leaves.len() == 1 {
1359        return Ok(leaves[0].0);
1360    }
1361    let mut level: Vec<(u32, i64)> = leaves;
1362    while level.len() > 1 {
1363        level = stage_interior_level(pager, &level, alloc)?;
1364    }
1365    Ok(level[0].0)
1366}
1367
1368/// Packs the index's (value, rowid) entries into a sibling-chained run
1369/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1370/// (ascending value; rowids in insertion order within a value), which is
1371/// also ascending by the "cell rowid" carried in each IndexCell (the
1372/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1373/// rowid ordering stays consistent.
1374fn stage_index_leaves(
1375    pager: &mut Pager,
1376    idx: &SecondaryIndex,
1377    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1378) -> Result<Vec<(u32, i64)>> {
1379    let mut leaves: Vec<(u32, i64)> = Vec::new();
1380    let mut current_leaf = TablePage::empty();
1381    let mut current_leaf_page = alloc.allocate();
1382    let mut current_max_rowid: Option<i64> = None;
1383
1384    // Sort the entries by original rowid so the in-page slot directory,
1385    // which binary-searches by rowid, stays valid. (iter_entries orders by
1386    // value; we reorder here for B-Tree correctness.)
1387    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1388    entries.sort_by_key(|(_, r)| *r);
1389
1390    for (value, rowid) in entries {
1391        let cell = IndexCell::new(rowid, value);
1392        let entry_bytes = cell.encode()?;
1393
1394        if !current_leaf.would_fit(entry_bytes.len()) {
1395            let next_leaf_page_num = alloc.allocate();
1396            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1397            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1398            current_leaf = TablePage::empty();
1399            current_leaf_page = next_leaf_page_num;
1400
1401            if !current_leaf.would_fit(entry_bytes.len()) {
1402                return Err(SQLRiteError::Internal(format!(
1403                    "index entry of {} bytes exceeds empty-page capacity {}",
1404                    entry_bytes.len(),
1405                    current_leaf.free_space()
1406                )));
1407            }
1408        }
1409        current_leaf.insert_entry(rowid, &entry_bytes)?;
1410        current_max_rowid = Some(rowid);
1411    }
1412
1413    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1414    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1415    Ok(leaves)
1416}
1417
1418/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1419/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1420/// (node_id, layers). Returns `(root_page, next_free_page)`.
1421///
1422/// Tree shape is identical to `stage_index_btree` — chained leaves +
1423/// optional interior layers. The slot directory binary-searches by
1424/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1425/// so reads can locate any node in O(log N) once 7d.4-or-later
1426/// optimizes the load path to lazy-fetch instead of read-all.
1427/// Today, `load_hnsw_nodes` reads the entire tree on open.
1428fn stage_hnsw_btree(
1429    pager: &mut Pager,
1430    idx: &crate::sql::hnsw::HnswIndex,
1431    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1432) -> Result<u32> {
1433    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1434    if leaves.len() == 1 {
1435        return Ok(leaves[0].0);
1436    }
1437    let mut level: Vec<(u32, i64)> = leaves;
1438    while level.len() > 1 {
1439        level = stage_interior_level(pager, &level, alloc)?;
1440    }
1441    Ok(level[0].0)
1442}
1443
1444/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1445/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1446/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1447/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1448/// doc-lengths map, then one cell per term in lexicographic order.
1449fn stage_fts_btree(
1450    pager: &mut Pager,
1451    idx: &crate::sql::fts::PostingList,
1452    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1453) -> Result<u32> {
1454    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1455    if leaves.len() == 1 {
1456        return Ok(leaves[0].0);
1457    }
1458    let mut level: Vec<(u32, i64)> = leaves;
1459    while level.len() > 1 {
1460        level = stage_interior_level(pager, &level, alloc)?;
1461    }
1462    Ok(level[0].0)
1463}
1464
1465/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1466/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1467/// followed by one cell per term in lexicographic order with
1468/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1469/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1470/// returns).
1471fn stage_fts_leaves(
1472    pager: &mut Pager,
1473    idx: &crate::sql::fts::PostingList,
1474    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1475) -> Result<Vec<(u32, i64)>> {
1476    use crate::sql::pager::fts_cell::FtsPostingCell;
1477
1478    let mut leaves: Vec<(u32, i64)> = Vec::new();
1479    let mut current_leaf = TablePage::empty();
1480    let mut current_leaf_page = alloc.allocate();
1481    let mut current_max_rowid: Option<i64> = None;
1482
1483    // Build the cell sequence: sidecar first, then per-term cells. The
1484    // sidecar always exists (even on an empty index) so reload sees a
1485    // canonical "this index was persisted" marker in slot 0.
1486    let mut cell_id: i64 = 1;
1487    let mut cells: Vec<FtsPostingCell> = Vec::new();
1488    cells.push(FtsPostingCell::doc_lengths(
1489        cell_id,
1490        idx.serialize_doc_lengths(),
1491    ));
1492    for (term, entries) in idx.serialize_postings() {
1493        cell_id += 1;
1494        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1495    }
1496
1497    for cell in cells {
1498        let entry_bytes = cell.encode()?;
1499
1500        if !current_leaf.would_fit(entry_bytes.len()) {
1501            let next_leaf_page_num = alloc.allocate();
1502            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1503            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1504            current_leaf = TablePage::empty();
1505            current_leaf_page = next_leaf_page_num;
1506
1507            if !current_leaf.would_fit(entry_bytes.len()) {
1508                // A single posting cell exceeds page capacity. Phase
1509                // 8c MVP doesn't chain via overflow cells (the plan
1510                // notes this as a stretch goal); surface a clear
1511                // error so users know which term tripped it.
1512                return Err(SQLRiteError::Internal(format!(
1513                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1514                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1515                    cell.cell_id,
1516                    entry_bytes.len(),
1517                    current_leaf.free_space()
1518                )));
1519            }
1520        }
1521        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1522        current_max_rowid = Some(cell.cell_id);
1523    }
1524
1525    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1526    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1527    Ok(leaves)
1528}
1529
1530/// (rowid, value) pairs as decoded from a single FTS cell — value is
1531/// either term frequency (posting cell) or doc length (sidecar cell).
1532type FtsEntries = Vec<(i64, u32)>;
1533/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1534type FtsPostings = Vec<(String, FtsEntries)>;
1535
1536/// Phase 8c — read every cell of an FTS index from `root_page` back
1537/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1538/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1539/// sibling chain, decode each slot.
1540fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1541    use crate::sql::pager::fts_cell::FtsPostingCell;
1542
1543    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1544    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1545    let mut saw_sidecar = false;
1546
1547    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1548    let mut current = first_leaf;
1549    while current != 0 {
1550        let page_buf = pager
1551            .read_page(current)
1552            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1553        if page_buf[0] != PageType::TableLeaf as u8 {
1554            return Err(SQLRiteError::Internal(format!(
1555                "page {current} tagged {} but expected TableLeaf (FTS)",
1556                page_buf[0]
1557            )));
1558        }
1559        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1560        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1561            .try_into()
1562            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1563        let leaf = TablePage::from_bytes(payload);
1564        for slot in 0..leaf.slot_count() {
1565            let offset = leaf.slot_offset_raw(slot)?;
1566            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1567            if cell.is_doc_lengths() {
1568                if saw_sidecar {
1569                    return Err(SQLRiteError::Internal(
1570                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1571                    ));
1572                }
1573                saw_sidecar = true;
1574                doc_lengths = cell.entries;
1575            } else {
1576                postings.push((cell.term, cell.entries));
1577            }
1578        }
1579        current = next_leaf;
1580    }
1581
1582    if !saw_sidecar {
1583        return Err(SQLRiteError::Internal(
1584            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1585        ));
1586    }
1587    Ok((doc_lengths, postings))
1588}
1589
1590/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1591/// `serialize_nodes` already returns nodes in ascending node_id order,
1592/// so the slot directory's rowid ordering stays valid.
1593fn stage_hnsw_leaves(
1594    pager: &mut Pager,
1595    idx: &crate::sql::hnsw::HnswIndex,
1596    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1597) -> Result<Vec<(u32, i64)>> {
1598    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1599
1600    let mut leaves: Vec<(u32, i64)> = Vec::new();
1601    let mut current_leaf = TablePage::empty();
1602    let mut current_leaf_page = alloc.allocate();
1603    let mut current_max_rowid: Option<i64> = None;
1604
1605    let serialized = idx.serialize_nodes();
1606
1607    // Empty index → emit a single empty leaf page so the rootpage
1608    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1609    // it just happens to be empty"). load_hnsw_nodes is fine with an
1610    // empty leaf — slot_count() returns 0.
1611    for (node_id, layers) in serialized {
1612        let cell = HnswNodeCell::new(node_id, layers);
1613        let entry_bytes = cell.encode()?;
1614
1615        if !current_leaf.would_fit(entry_bytes.len()) {
1616            let next_leaf_page_num = alloc.allocate();
1617            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1618            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1619            current_leaf = TablePage::empty();
1620            current_leaf_page = next_leaf_page_num;
1621
1622            if !current_leaf.would_fit(entry_bytes.len()) {
1623                return Err(SQLRiteError::Internal(format!(
1624                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1625                    entry_bytes.len(),
1626                    current_leaf.free_space()
1627                )));
1628            }
1629        }
1630        current_leaf.insert_entry(node_id, &entry_bytes)?;
1631        current_max_rowid = Some(node_id);
1632    }
1633
1634    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1635    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1636    Ok(leaves)
1637}
1638
1639fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1640    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1641    let mut current = first_leaf;
1642    while current != 0 {
1643        let page_buf = pager
1644            .read_page(current)
1645            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1646        if page_buf[0] != PageType::TableLeaf as u8 {
1647            return Err(SQLRiteError::Internal(format!(
1648                "page {current} tagged {} but expected TableLeaf",
1649                page_buf[0]
1650            )));
1651        }
1652        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1653        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1654            .try_into()
1655            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1656        let leaf = TablePage::from_bytes(payload);
1657
1658        for slot in 0..leaf.slot_count() {
1659            let entry = leaf.entry_at(slot)?;
1660            let cell = match entry {
1661                PagedEntry::Local(c) => c,
1662                PagedEntry::Overflow(r) => {
1663                    let body_bytes =
1664                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1665                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1666                    c
1667                }
1668            };
1669            table.restore_row(cell.rowid, cell.values)?;
1670        }
1671        current = next_leaf;
1672    }
1673    Ok(())
1674}
1675
1676/// Walks every page reachable from `root_page` and returns their page
1677/// numbers. Includes `root_page`, every interior page, every leaf, and
1678/// — when `follow_overflow` is true — every overflow page chained off
1679/// table-leaf cells. Used by `save_database` to seed each table's
1680/// per-table preferred pool and to compute the newly-freed set.
1681///
1682/// `follow_overflow = true` for table B-Trees (cells may carry
1683/// `OverflowRef`s pointing at chained overflow pages); `false` for
1684/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1685/// current encoding.
1686fn collect_pages_for_btree(
1687    pager: &Pager,
1688    root_page: u32,
1689    follow_overflow: bool,
1690) -> Result<Vec<u32>> {
1691    if root_page == 0 {
1692        return Ok(Vec::new());
1693    }
1694    let mut pages: Vec<u32> = Vec::new();
1695    let mut stack: Vec<u32> = vec![root_page];
1696
1697    while let Some(p) = stack.pop() {
1698        let buf = pager.read_page(p).ok_or_else(|| {
1699            SQLRiteError::Internal(format!(
1700                "collect_pages: missing page {p} (rooted at {root_page})"
1701            ))
1702        })?;
1703        pages.push(p);
1704        match buf[0] {
1705            t if t == PageType::InteriorNode as u8 => {
1706                let payload: &[u8; PAYLOAD_PER_PAGE] =
1707                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1708                        SQLRiteError::Internal("interior payload slice size".to_string())
1709                    })?;
1710                let interior = InteriorPage::from_bytes(payload);
1711                // Push every divider's child + the rightmost child.
1712                for slot in 0..interior.slot_count() {
1713                    let cell = interior.cell_at(slot)?;
1714                    stack.push(cell.child_page);
1715                }
1716                stack.push(interior.rightmost_child());
1717            }
1718            t if t == PageType::TableLeaf as u8 => {
1719                if follow_overflow {
1720                    let payload: &[u8; PAYLOAD_PER_PAGE] =
1721                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1722                            SQLRiteError::Internal("leaf payload slice size".to_string())
1723                        })?;
1724                    let leaf = TablePage::from_bytes(payload);
1725                    for slot in 0..leaf.slot_count() {
1726                        match leaf.entry_at(slot)? {
1727                            PagedEntry::Local(_) => {}
1728                            PagedEntry::Overflow(r) => {
1729                                let mut cur = r.first_overflow_page;
1730                                while cur != 0 {
1731                                    pages.push(cur);
1732                                    let ob = pager.read_page(cur).ok_or_else(|| {
1733                                        SQLRiteError::Internal(format!(
1734                                            "collect_pages: missing overflow page {cur}"
1735                                        ))
1736                                    })?;
1737                                    if ob[0] != PageType::Overflow as u8 {
1738                                        return Err(SQLRiteError::Internal(format!(
1739                                            "collect_pages: page {cur} expected Overflow, got tag {}",
1740                                            ob[0]
1741                                        )));
1742                                    }
1743                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
1744                                }
1745                            }
1746                        }
1747                    }
1748                }
1749            }
1750            other => {
1751                return Err(SQLRiteError::Internal(format!(
1752                    "collect_pages: unexpected page type {other} at page {p}"
1753                )));
1754            }
1755        }
1756    }
1757    Ok(pages)
1758}
1759
1760/// Reads the previously-persisted `sqlrite_master` and returns a map from
1761/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1762/// seed each table/index's per-table preferred pool with the pages it
1763/// occupied last time round.
1764///
1765/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1766/// the three index families via the SQL string, but for page-collection
1767/// purposes a "table" tree must follow overflow refs while an "index"
1768/// tree never does — that's the only distinction we need here).
1769fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
1770    let mut out: HashMap<(String, String), u32> = HashMap::new();
1771    if schema_root == 0 {
1772        return Ok(out);
1773    }
1774    let mut master = build_empty_master_table();
1775    load_table_rows(pager, &mut master, schema_root)?;
1776    for rowid in master.rowids() {
1777        let kind = take_text(&master, "type", rowid)?;
1778        let name = take_text(&master, "name", rowid)?;
1779        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
1780        out.insert((kind, name), rootpage);
1781    }
1782    Ok(out)
1783}
1784
1785/// Descends from `root_page` through `InteriorNode` pages, always taking
1786/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1787/// page number. A root that's already a leaf is returned as-is.
1788fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1789    let mut current = root_page;
1790    loop {
1791        let page_buf = pager.read_page(current).ok_or_else(|| {
1792            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1793        })?;
1794        match page_buf[0] {
1795            t if t == PageType::TableLeaf as u8 => return Ok(current),
1796            t if t == PageType::InteriorNode as u8 => {
1797                let payload: &[u8; PAYLOAD_PER_PAGE] =
1798                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1799                        SQLRiteError::Internal("interior payload slice size".to_string())
1800                    })?;
1801                let interior = InteriorPage::from_bytes(payload);
1802                current = interior.leftmost_child()?;
1803            }
1804            other => {
1805                return Err(SQLRiteError::Internal(format!(
1806                    "unexpected page type {other} during tree descent at page {current}"
1807                )));
1808            }
1809        }
1810    }
1811}
1812
1813/// Stages a table's B-Tree, drawing every page number from `alloc`.
1814/// Returns the root page (the topmost interior page, or the single leaf
1815/// when the table fits in one page).
1816///
1817/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1818/// `next_page`, then if more than one leaf, recursively wrap them in
1819/// `InteriorNode` levels until one root remains.
1820///
1821/// Deterministic: same rows + same allocator handouts → byte-identical
1822/// pages at the same numbers, so the diff pager skips unchanged tables.
1823fn stage_table_btree(
1824    pager: &mut Pager,
1825    table: &Table,
1826    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1827) -> Result<u32> {
1828    let leaves = stage_leaves(pager, table, alloc)?;
1829    if leaves.len() == 1 {
1830        return Ok(leaves[0].0);
1831    }
1832    let mut level: Vec<(u32, i64)> = leaves;
1833    while level.len() > 1 {
1834        level = stage_interior_level(pager, &level, alloc)?;
1835    }
1836    Ok(level[0].0)
1837}
1838
1839/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1840/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1841/// interior level. Allocates leaf and overflow pages from `alloc`.
1842fn stage_leaves(
1843    pager: &mut Pager,
1844    table: &Table,
1845    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1846) -> Result<Vec<(u32, i64)>> {
1847    let mut leaves: Vec<(u32, i64)> = Vec::new();
1848    let mut current_leaf = TablePage::empty();
1849    let mut current_leaf_page = alloc.allocate();
1850    let mut current_max_rowid: Option<i64> = None;
1851
1852    for rowid in table.rowids() {
1853        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
1854
1855        if !current_leaf.would_fit(entry_bytes.len()) {
1856            // The new leaf goes at whatever the allocator hands out
1857            // next. Commit the current leaf with that as its sibling
1858            // pointer.
1859            let next_leaf_page_num = alloc.allocate();
1860            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1861            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1862            current_leaf = TablePage::empty();
1863            current_leaf_page = next_leaf_page_num;
1864            // current_max_rowid is reassigned by the insert below; no need
1865            // to zero it out here.
1866
1867            if !current_leaf.would_fit(entry_bytes.len()) {
1868                return Err(SQLRiteError::Internal(format!(
1869                    "entry of {} bytes exceeds empty-page capacity {}",
1870                    entry_bytes.len(),
1871                    current_leaf.free_space()
1872                )));
1873            }
1874        }
1875        current_leaf.insert_entry(rowid, &entry_bytes)?;
1876        current_max_rowid = Some(rowid);
1877    }
1878
1879    // Final leaf: sibling next_page = 0 (end of chain).
1880    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1881    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1882    Ok(leaves)
1883}
1884
1885/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1886/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1887/// encoded cell exceeded the inline threshold. Allocates any overflow
1888/// pages from `alloc`.
1889fn build_row_entry(
1890    pager: &mut Pager,
1891    table: &Table,
1892    rowid: i64,
1893    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1894) -> Result<Vec<u8>> {
1895    let values = table.extract_row(rowid);
1896    let local_cell = Cell::new(rowid, values);
1897    let local_bytes = local_cell.encode()?;
1898    if local_bytes.len() > OVERFLOW_THRESHOLD {
1899        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
1900        Ok(OverflowRef {
1901            rowid,
1902            total_body_len: local_bytes.len() as u64,
1903            first_overflow_page: overflow_start,
1904        }
1905        .encode())
1906    } else {
1907        Ok(local_bytes)
1908    }
1909}
1910
1911/// Builds one level of `InteriorNode` pages above the given children.
1912/// Each interior packs as many dividers as will fit; the last child
1913/// assigned to an interior becomes its `rightmost_child`. Returns the
1914/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
1915fn stage_interior_level(
1916    pager: &mut Pager,
1917    children: &[(u32, i64)],
1918    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1919) -> Result<Vec<(u32, i64)>> {
1920    let mut next_level: Vec<(u32, i64)> = Vec::new();
1921    let mut idx = 0usize;
1922
1923    while idx < children.len() {
1924        let interior_page_num = alloc.allocate();
1925
1926        // Seed the interior with the first unassigned child as its
1927        // rightmost. As we add more children, the previous rightmost
1928        // graduates to being a divider and the new arrival takes over
1929        // as rightmost.
1930        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1931        idx += 1;
1932        let mut interior = InteriorPage::empty(rightmost_child_page);
1933
1934        while idx < children.len() {
1935            let new_divider_cell = InteriorCell {
1936                divider_rowid: rightmost_child_max,
1937                child_page: rightmost_child_page,
1938            };
1939            let new_divider_bytes = new_divider_cell.encode();
1940            if !interior.would_fit(new_divider_bytes.len()) {
1941                break;
1942            }
1943            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1944            let (next_child_page, next_child_max) = children[idx];
1945            interior.set_rightmost_child(next_child_page);
1946            rightmost_child_page = next_child_page;
1947            rightmost_child_max = next_child_max;
1948            idx += 1;
1949        }
1950
1951        emit_interior(pager, interior_page_num, &interior);
1952        next_level.push((interior_page_num, rightmost_child_max));
1953    }
1954
1955    Ok(next_level)
1956}
1957
1958/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1959fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1960    let mut buf = [0u8; PAGE_SIZE];
1961    buf[0] = PageType::TableLeaf as u8;
1962    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1963    // For leaf pages the legacy `payload_len` field isn't used — the slot
1964    // directory self-describes. Zero it by convention.
1965    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1966    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1967    pager.stage_page(page_num, buf);
1968}
1969
1970/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1971/// don't use `next_page` (there's no sibling chain between interiors);
1972/// `payload_len` is also unused (the slot directory self-describes).
1973fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1974    let mut buf = [0u8; PAGE_SIZE];
1975    buf[0] = PageType::InteriorNode as u8;
1976    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1977    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1978    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1979    pager.stage_page(page_num, buf);
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984    use super::*;
1985    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
1986    use crate::sql::process_command;
1987
1988    fn seed_db() -> Database {
1989        let mut db = Database::new("test".to_string());
1990        process_command(
1991            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1992            &mut db,
1993        )
1994        .unwrap();
1995        process_command(
1996            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1997            &mut db,
1998        )
1999        .unwrap();
2000        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
2001        process_command(
2002            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
2003            &mut db,
2004        )
2005        .unwrap();
2006        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
2007        db
2008    }
2009
2010    fn tmp_path(name: &str) -> std::path::PathBuf {
2011        let mut p = std::env::temp_dir();
2012        let pid = std::process::id();
2013        let nanos = std::time::SystemTime::now()
2014            .duration_since(std::time::UNIX_EPOCH)
2015            .map(|d| d.as_nanos())
2016            .unwrap_or(0);
2017        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
2018        p
2019    }
2020
2021    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
2022    /// `/tmp` doesn't accumulate orphan WALs across test runs.
2023    fn cleanup(path: &std::path::Path) {
2024        let _ = std::fs::remove_file(path);
2025        let mut wal = path.as_os_str().to_owned();
2026        wal.push("-wal");
2027        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
2028    }
2029
2030    #[test]
2031    fn round_trip_preserves_schema_and_data() {
2032        let path = tmp_path("roundtrip");
2033        let mut db = seed_db();
2034        save_database(&mut db, &path).expect("save");
2035
2036        let loaded = open_database(&path, "test".to_string()).expect("open");
2037        assert_eq!(loaded.tables.len(), 2);
2038
2039        let users = loaded.get_table("users".to_string()).expect("users table");
2040        assert_eq!(users.columns.len(), 3);
2041        let rowids = users.rowids();
2042        assert_eq!(rowids.len(), 2);
2043        let names: Vec<String> = rowids
2044            .iter()
2045            .filter_map(|r| match users.get_value("name", *r) {
2046                Some(Value::Text(s)) => Some(s),
2047                _ => None,
2048            })
2049            .collect();
2050        assert!(names.contains(&"alice".to_string()));
2051        assert!(names.contains(&"bob".to_string()));
2052
2053        let notes = loaded.get_table("notes".to_string()).expect("notes table");
2054        assert_eq!(notes.rowids().len(), 1);
2055
2056        cleanup(&path);
2057    }
2058
2059    // -----------------------------------------------------------------
2060    // Phase 7a — VECTOR(N) save / reopen round-trip
2061    // -----------------------------------------------------------------
2062
2063    #[test]
2064    fn round_trip_preserves_vector_column() {
2065        let path = tmp_path("vec_roundtrip");
2066
2067        // Build, populate, save.
2068        {
2069            let mut db = Database::new("test".to_string());
2070            process_command(
2071                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
2072                &mut db,
2073            )
2074            .unwrap();
2075            process_command(
2076                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
2077                &mut db,
2078            )
2079            .unwrap();
2080            process_command(
2081                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
2082                &mut db,
2083            )
2084            .unwrap();
2085            save_database(&mut db, &path).expect("save");
2086        } // db drops → its exclusive lock releases before reopen.
2087
2088        // Reopen and verify schema + data both round-tripped.
2089        let loaded = open_database(&path, "test".to_string()).expect("open");
2090        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2091
2092        // Schema preserved: column is still VECTOR(3).
2093        let embedding_col = docs
2094            .columns
2095            .iter()
2096            .find(|c| c.column_name == "embedding")
2097            .expect("embedding column");
2098        assert!(
2099            matches!(embedding_col.datatype, DataType::Vector(3)),
2100            "expected DataType::Vector(3) after round-trip, got {:?}",
2101            embedding_col.datatype
2102        );
2103
2104        // Data preserved: both vectors still readable bit-for-bit.
2105        let mut rows: Vec<Vec<f32>> = docs
2106            .rowids()
2107            .iter()
2108            .filter_map(|r| match docs.get_value("embedding", *r) {
2109                Some(Value::Vector(v)) => Some(v),
2110                _ => None,
2111            })
2112            .collect();
2113        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
2114        assert_eq!(rows.len(), 2);
2115        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
2116        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
2117
2118        cleanup(&path);
2119    }
2120
2121    #[test]
2122    fn round_trip_preserves_json_column() {
2123        // Phase 7e — JSON columns are stored as Text under the hood with
2124        // INSERT-time validation. Save + reopen should preserve the
2125        // schema (DataType::Json) and the underlying text bytes; a
2126        // post-reopen json_extract should still resolve paths correctly.
2127        let path = tmp_path("json_roundtrip");
2128
2129        {
2130            let mut db = Database::new("test".to_string());
2131            process_command(
2132                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2133                &mut db,
2134            )
2135            .unwrap();
2136            process_command(
2137                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2138                &mut db,
2139            )
2140            .unwrap();
2141            save_database(&mut db, &path).expect("save");
2142        }
2143
2144        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2145        let docs = loaded.get_table("docs".to_string()).expect("docs");
2146
2147        // Schema: column declared as JSON, restored with the same type.
2148        let payload_col = docs
2149            .columns
2150            .iter()
2151            .find(|c| c.column_name == "payload")
2152            .unwrap();
2153        assert!(
2154            matches!(payload_col.datatype, DataType::Json),
2155            "expected DataType::Json, got {:?}",
2156            payload_col.datatype
2157        );
2158
2159        // json_extract works against the reopened data — exercises the
2160        // full Text-storage + serde_json::from_str path post-reopen.
2161        let resp = process_command(
2162            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2163            &mut loaded,
2164        )
2165        .expect("select via json_extract after reopen");
2166        assert!(resp.contains("1 row returned"), "got: {resp}");
2167
2168        cleanup(&path);
2169    }
2170
2171    #[test]
2172    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
2173        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2174        // pages. After save+reopen the index entry reattaches with the
2175        // same column + same node count, loaded directly from disk
2176        // instead of re-walking rows.
2177        let path = tmp_path("hnsw_roundtrip");
2178
2179        // Build, populate, index, save.
2180        {
2181            let mut db = Database::new("test".to_string());
2182            process_command(
2183                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2184                &mut db,
2185            )
2186            .unwrap();
2187            for v in &[
2188                "[1.0, 0.0]",
2189                "[2.0, 0.0]",
2190                "[0.0, 3.0]",
2191                "[1.0, 4.0]",
2192                "[10.0, 10.0]",
2193            ] {
2194                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2195            }
2196            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2197            save_database(&mut db, &path).expect("save");
2198        } // db drops → exclusive lock releases.
2199
2200        // Reopen and verify the index reattached, with the same name +
2201        // column + populated graph.
2202        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2203        {
2204            let table = loaded.get_table("docs".to_string()).expect("docs");
2205            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
2206            let entry = &table.hnsw_indexes[0];
2207            assert_eq!(entry.name, "ix_e");
2208            assert_eq!(entry.column_name, "e");
2209            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
2210            assert!(
2211                !entry.needs_rebuild,
2212                "fresh load should not be marked dirty"
2213            );
2214        }
2215
2216        // Quick functional check: KNN query through the loaded index
2217        // returns results.
2218        let resp = process_command(
2219            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2220            &mut loaded,
2221        )
2222        .unwrap();
2223        assert!(resp.contains("3 rows returned"), "got: {resp}");
2224
2225        cleanup(&path);
2226    }
2227
2228    /// SQLR-28 — the HNSW metric must round-trip across save+reopen.
2229    /// Without this, the SQL re-synthesised into `sqlrite_master`
2230    /// would drop the metric and a cosine-built graph would reload
2231    /// as L2, silently breaking subsequent cosine probes.
2232    #[test]
2233    fn round_trip_preserves_hnsw_cosine_metric() {
2234        use crate::sql::hnsw::DistanceMetric;
2235        let path = tmp_path("hnsw_metric_roundtrip");
2236
2237        {
2238            let mut db = Database::new("test".to_string());
2239            process_command(
2240                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2241                &mut db,
2242            )
2243            .unwrap();
2244            for v in &["[1.0, 0.0]", "[0.0, 1.0]", "[0.7071, 0.7071]"] {
2245                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2246            }
2247            process_command(
2248                "CREATE INDEX ix_cos ON docs USING hnsw (e) WITH (metric = 'cosine');",
2249                &mut db,
2250            )
2251            .unwrap();
2252            save_database(&mut db, &path).expect("save");
2253        }
2254
2255        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2256        {
2257            let table = loaded.get_table("docs".to_string()).expect("docs");
2258            assert_eq!(table.hnsw_indexes.len(), 1);
2259            assert_eq!(
2260                table.hnsw_indexes[0].metric,
2261                DistanceMetric::Cosine,
2262                "metric should round-trip through CREATE INDEX SQL"
2263            );
2264            assert_eq!(table.hnsw_indexes[0].index.distance, DistanceMetric::Cosine);
2265        }
2266
2267        // Cosine probe still finds the self-vector after reopen — the
2268        // optimizer's metric gate should match the loaded entry's
2269        // metric, so this should hit the graph shortcut.
2270        let resp = process_command(
2271            "SELECT id FROM docs ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1;",
2272            &mut loaded,
2273        )
2274        .unwrap();
2275        assert!(resp.contains("1 row returned"), "got: {resp}");
2276
2277        cleanup(&path);
2278    }
2279
2280    #[test]
2281    fn round_trip_rebuilds_fts_index_from_create_sql() {
2282        // Phase 8c: FTS indexes now persist their posting lists as
2283        // cell-encoded pages. After save+reopen the index entry
2284        // reattaches with the same column + same posting count, loaded
2285        // directly from disk (no re-tokenization).
2286        let path = tmp_path("fts_roundtrip");
2287
2288        {
2289            let mut db = Database::new("test".to_string());
2290            process_command(
2291                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2292                &mut db,
2293            )
2294            .unwrap();
2295            for body in &[
2296                "rust embedded database",
2297                "rust web framework",
2298                "go embedded systems",
2299                "python web framework",
2300                "rust rust embedded power",
2301            ] {
2302                process_command(
2303                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2304                    &mut db,
2305                )
2306                .unwrap();
2307            }
2308            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2309            save_database(&mut db, &path).expect("save");
2310        } // db drops → exclusive lock releases.
2311
2312        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2313        {
2314            let table = loaded.get_table("docs".to_string()).expect("docs");
2315            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
2316            let entry = &table.fts_indexes[0];
2317            assert_eq!(entry.name, "ix_body");
2318            assert_eq!(entry.column_name, "body");
2319            assert_eq!(
2320                entry.index.len(),
2321                5,
2322                "rebuilt posting list should hold all 5 rows"
2323            );
2324            assert!(!entry.needs_rebuild);
2325        }
2326
2327        // Functional smoke: an FTS query through the reloaded index
2328        // returns the expected hit count.
2329        let resp = process_command(
2330            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2331            &mut loaded,
2332        )
2333        .unwrap();
2334        assert!(resp.contains("3 rows returned"), "got: {resp}");
2335
2336        cleanup(&path);
2337    }
2338
2339    #[test]
2340    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
2341        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2342        // from current rows; reopen replays the CREATE INDEX SQL against
2343        // the post-delete row set. The deleted rowid must not surface
2344        // in `fts_match` results post-reopen.
2345        let path = tmp_path("fts_delete_rebuild");
2346        let mut db = Database::new("test".to_string());
2347        process_command(
2348            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2349            &mut db,
2350        )
2351        .unwrap();
2352        for body in &[
2353            "rust embedded",
2354            "rust framework",
2355            "go embedded",
2356            "python web",
2357        ] {
2358            process_command(
2359                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2360                &mut db,
2361            )
2362            .unwrap();
2363        }
2364        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2365
2366        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2367        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2368        save_database(&mut db, &path).expect("save");
2369        drop(db);
2370
2371        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2372        let resp = process_command(
2373            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2374            &mut loaded,
2375        )
2376        .unwrap();
2377        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2378        // 'rust'. Post-delete: only id=2 remains.
2379        assert!(resp.contains("1 row returned"), "got: {resp}");
2380
2381        cleanup(&path);
2382    }
2383
2384    #[test]
2385    fn fts_roundtrip_uses_persistence_path_not_replay() {
2386        // Phase 8c — assert the reload didn't go through the
2387        // rootpage=0 replay shortcut. We do this by reading the
2388        // sqlrite_master row for the FTS index and confirming its
2389        // rootpage field is non-zero.
2390        let path = tmp_path("fts_persistence_path");
2391
2392        {
2393            let mut db = Database::new("test".to_string());
2394            process_command(
2395                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2396                &mut db,
2397            )
2398            .unwrap();
2399            process_command(
2400                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2401                &mut db,
2402            )
2403            .unwrap();
2404            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2405            save_database(&mut db, &path).expect("save");
2406        }
2407
2408        // Read raw sqlrite_master to find the FTS index row.
2409        let pager = Pager::open(&path).expect("open pager");
2410        let mut master = build_empty_master_table();
2411        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2412        let mut found_rootpage: Option<u32> = None;
2413        for rowid in master.rowids() {
2414            let name = take_text(&master, "name", rowid).unwrap();
2415            if name == "ix_body" {
2416                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2417                found_rootpage = Some(rp as u32);
2418            }
2419        }
2420        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2421        assert!(
2422            rootpage != 0,
2423            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2424        );
2425
2426        cleanup(&path);
2427    }
2428
2429    #[test]
2430    fn save_without_fts_keeps_format_v4() {
2431        // Phase 8c on-demand bump — a database with zero FTS indexes
2432        // continues writing the v4 header. Existing v4 users must not
2433        // see their files silently promoted to v5 by an upgrade.
2434        use crate::sql::pager::header::FORMAT_VERSION_V4;
2435
2436        let path = tmp_path("fts_no_bump");
2437        let mut db = Database::new("test".to_string());
2438        process_command(
2439            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2440            &mut db,
2441        )
2442        .unwrap();
2443        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2444        save_database(&mut db, &path).unwrap();
2445        drop(db);
2446
2447        let pager = Pager::open(&path).expect("open");
2448        assert_eq!(
2449            pager.header().format_version,
2450            FORMAT_VERSION_V4,
2451            "no-FTS save should keep v4"
2452        );
2453        cleanup(&path);
2454    }
2455
2456    #[test]
2457    fn save_with_fts_bumps_to_v5() {
2458        // Phase 8c on-demand bump — first FTS-bearing save promotes
2459        // the file to v5. v5 readers handle both v4 and v5; v4
2460        // readers correctly refuse a v5 file.
2461        use crate::sql::pager::header::FORMAT_VERSION_V5;
2462
2463        let path = tmp_path("fts_bump_v5");
2464        let mut db = Database::new("test".to_string());
2465        process_command(
2466            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2467            &mut db,
2468        )
2469        .unwrap();
2470        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2471        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2472        save_database(&mut db, &path).unwrap();
2473        drop(db);
2474
2475        let pager = Pager::open(&path).expect("open");
2476        assert_eq!(
2477            pager.header().format_version,
2478            FORMAT_VERSION_V5,
2479            "FTS save should promote to v5"
2480        );
2481        cleanup(&path);
2482    }
2483
2484    #[test]
2485    fn fts_persistence_handles_empty_and_zero_token_docs() {
2486        // Phase 8c — sidecar cell carries doc-lengths for every doc
2487        // including any with zero tokens (so total_docs is honest
2488        // post-reopen). Empty index also round-trips: a CREATE INDEX
2489        // on an empty table emits a single empty leaf with just the
2490        // (empty) sidecar.
2491        let path = tmp_path("fts_edges");
2492
2493        {
2494            let mut db = Database::new("test".to_string());
2495            process_command(
2496                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2497                &mut db,
2498            )
2499            .unwrap();
2500            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2501            // Mix: real text, then a row that tokenizes to zero tokens
2502            // (only punctuation), then real again.
2503            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2504            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2505            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2506            save_database(&mut db, &path).unwrap();
2507        }
2508
2509        let loaded = open_database(&path, "test".to_string()).expect("open");
2510        let table = loaded.get_table("docs".to_string()).unwrap();
2511        let entry = &table.fts_indexes[0];
2512        // All three rows present — including the zero-token row,
2513        // which is critical for total_docs honesty in BM25.
2514        assert_eq!(entry.index.len(), 3);
2515        // 'embedded' appears in 2 rows after reload.
2516        let res = entry
2517            .index
2518            .query("embedded", &crate::sql::fts::Bm25Params::default());
2519        assert_eq!(res.len(), 2);
2520
2521        cleanup(&path);
2522    }
2523
2524    #[test]
2525    fn fts_persistence_round_trips_large_corpus() {
2526        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2527        // single-token bodies generates enough cells to overflow a
2528        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2529        let path = tmp_path("fts_large_corpus");
2530
2531        let mut expected_terms: std::collections::BTreeSet<String> =
2532            std::collections::BTreeSet::new();
2533        {
2534            let mut db = Database::new("test".to_string());
2535            process_command(
2536                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2537                &mut db,
2538            )
2539            .unwrap();
2540            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2541            // 500 docs, each one a unique term — drives unique-term
2542            // count up so multiple leaves are required.
2543            for i in 0..500 {
2544                let term = format!("term{i:04}");
2545                process_command(
2546                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2547                    &mut db,
2548                )
2549                .unwrap();
2550                expected_terms.insert(term);
2551            }
2552            save_database(&mut db, &path).unwrap();
2553        }
2554
2555        let loaded = open_database(&path, "test".to_string()).expect("open");
2556        let table = loaded.get_table("docs".to_string()).unwrap();
2557        let entry = &table.fts_indexes[0];
2558        assert_eq!(entry.index.len(), 500);
2559
2560        // Spot-check a handful of terms come back with their original
2561        // single-row posting list.
2562        for &i in &[0_i64, 137, 248, 391, 499] {
2563            let term = format!("term{i:04}");
2564            let res = entry
2565                .index
2566                .query(&term, &crate::sql::fts::Bm25Params::default());
2567            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2568            // PrimaryKey rowids start at 1; doc i was inserted at
2569            // rowid i+1.
2570            assert_eq!(res[0].0, i + 1);
2571        }
2572
2573        cleanup(&path);
2574    }
2575
2576    #[test]
2577    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2578        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2579        // current rows + serializes; reopen loads the post-delete graph.
2580        // After all that, the deleted rowid must NOT come back from a
2581        // KNN query.
2582        let path = tmp_path("hnsw_delete_rebuild");
2583        let mut db = Database::new("test".to_string());
2584        process_command(
2585            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2586            &mut db,
2587        )
2588        .unwrap();
2589        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2590            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2591        }
2592        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2593
2594        // Delete row 1 (the closest match to [0.5, 0.0]).
2595        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2596        // Confirm it marked dirty.
2597        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2598        assert!(dirty_before_save, "DELETE should mark dirty");
2599
2600        save_database(&mut db, &path).expect("save");
2601        // Confirm save cleared the dirty flag.
2602        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2603        assert!(!dirty_after_save, "save should clear dirty");
2604        drop(db);
2605
2606        // Reopen, query for the closest match. Row 1 is gone; row 2
2607        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2608        let loaded = open_database(&path, "test".to_string()).expect("open");
2609        let docs = loaded.get_table("docs".to_string()).expect("docs");
2610
2611        // Row 1 must not appear in any storage anymore.
2612        assert!(
2613            !docs.rowids().contains(&1),
2614            "deleted row 1 should not be in row storage"
2615        );
2616        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2617
2618        // The HNSW index must also have shed the deleted node.
2619        assert_eq!(
2620            docs.hnsw_indexes[0].index.len(),
2621            3,
2622            "HNSW graph should have shed the deleted node"
2623        );
2624
2625        cleanup(&path);
2626    }
2627
2628    #[test]
2629    fn round_trip_survives_writes_after_load() {
2630        let path = tmp_path("after_load");
2631        save_database(&mut seed_db(), &path).unwrap();
2632
2633        {
2634            let mut db = open_database(&path, "test".to_string()).unwrap();
2635            process_command(
2636                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2637                &mut db,
2638            )
2639            .unwrap();
2640            save_database(&mut db, &path).unwrap();
2641        } // db drops → its exclusive lock releases before we reopen below.
2642
2643        let db2 = open_database(&path, "test".to_string()).unwrap();
2644        let users = db2.get_table("users".to_string()).unwrap();
2645        assert_eq!(users.rowids().len(), 3);
2646
2647        cleanup(&path);
2648    }
2649
2650    #[test]
2651    fn open_rejects_garbage_file() {
2652        let path = tmp_path("bad");
2653        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2654        let result = open_database(&path, "x".to_string());
2655        assert!(result.is_err());
2656        cleanup(&path);
2657    }
2658
2659    #[test]
2660    fn many_small_rows_spread_across_leaves() {
2661        let path = tmp_path("many_rows");
2662        let mut db = Database::new("big".to_string());
2663        process_command(
2664            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2665            &mut db,
2666        )
2667        .unwrap();
2668        for i in 0..200 {
2669            let body = "x".repeat(200);
2670            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2671            process_command(&q, &mut db).unwrap();
2672        }
2673        save_database(&mut db, &path).unwrap();
2674        let loaded = open_database(&path, "big".to_string()).unwrap();
2675        let things = loaded.get_table("things".to_string()).unwrap();
2676        assert_eq!(things.rowids().len(), 200);
2677        cleanup(&path);
2678    }
2679
2680    #[test]
2681    fn huge_row_goes_through_overflow() {
2682        let path = tmp_path("overflow_row");
2683        let mut db = Database::new("big".to_string());
2684        process_command(
2685            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2686            &mut db,
2687        )
2688        .unwrap();
2689        let body = "A".repeat(10_000);
2690        process_command(
2691            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2692            &mut db,
2693        )
2694        .unwrap();
2695        save_database(&mut db, &path).unwrap();
2696
2697        let loaded = open_database(&path, "big".to_string()).unwrap();
2698        let docs = loaded.get_table("docs".to_string()).unwrap();
2699        let rowids = docs.rowids();
2700        assert_eq!(rowids.len(), 1);
2701        let stored = docs.get_value("body", rowids[0]);
2702        match stored {
2703            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2704            other => panic!("expected Text, got {other:?}"),
2705        }
2706        cleanup(&path);
2707    }
2708
2709    #[test]
2710    fn create_sql_synthesis_round_trips() {
2711        // Build a table via CREATE, then verify table_to_create_sql +
2712        // parse_create_sql reproduce an equivalent column list.
2713        let mut db = Database::new("x".to_string());
2714        process_command(
2715            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2716            &mut db,
2717        )
2718        .unwrap();
2719        let t = db.get_table("t".to_string()).unwrap();
2720        let sql = table_to_create_sql(t);
2721        let (name, cols) = parse_create_sql(&sql).unwrap();
2722        assert_eq!(name, "t");
2723        assert_eq!(cols.len(), 3);
2724        assert!(cols[0].is_pk);
2725        assert!(cols[1].is_unique);
2726        assert!(cols[2].not_null);
2727    }
2728
2729    #[test]
2730    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2731        // After open, the public db.tables map should not list the master.
2732        let path = tmp_path("no_master");
2733        save_database(&mut seed_db(), &path).unwrap();
2734        let loaded = open_database(&path, "x".to_string()).unwrap();
2735        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2736        cleanup(&path);
2737    }
2738
2739    #[test]
2740    fn multi_leaf_table_produces_an_interior_root() {
2741        // 200 fat rows force the table into multiple leaves, which means
2742        // save_database must build at least one InteriorNode above them.
2743        // The test verifies the round-trip works and confirms the root is
2744        // indeed an interior page (not a leaf) by reading the page type
2745        // directly out of the open pager.
2746        let path = tmp_path("multi_leaf_interior");
2747        let mut db = Database::new("big".to_string());
2748        process_command(
2749            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2750            &mut db,
2751        )
2752        .unwrap();
2753        for i in 0..200 {
2754            let body = "x".repeat(200);
2755            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2756            process_command(&q, &mut db).unwrap();
2757        }
2758        save_database(&mut db, &path).unwrap();
2759
2760        // Confirm the round-trip preserved all 200 rows.
2761        let loaded = open_database(&path, "big".to_string()).unwrap();
2762        let things = loaded.get_table("things".to_string()).unwrap();
2763        assert_eq!(things.rowids().len(), 200);
2764
2765        // Peek at `things`'s root page via the pager attached to the
2766        // loaded DB and check it's an InteriorNode, not a leaf.
2767        let pager = loaded
2768            .pager
2769            .as_ref()
2770            .expect("loaded DB should have a pager");
2771        // sqlrite_master's row for `things` holds its root page. Easiest
2772        // way to find it: walk the leaf chain by using find_leftmost_leaf
2773        // and then hop one level up. Simpler: read the master, scan for
2774        // the "things" row, look up rootpage.
2775        let mut master = build_empty_master_table();
2776        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2777        let things_root = master
2778            .rowids()
2779            .into_iter()
2780            .find_map(|r| match master.get_value("name", r) {
2781                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2782                    Some(Value::Integer(p)) => Some(p as u32),
2783                    _ => None,
2784                },
2785                _ => None,
2786            })
2787            .expect("things should appear in sqlrite_master");
2788        let root_buf = pager.read_page(things_root).unwrap();
2789        assert_eq!(
2790            root_buf[0],
2791            PageType::InteriorNode as u8,
2792            "expected a multi-leaf table to have an interior root, got tag {}",
2793            root_buf[0]
2794        );
2795
2796        cleanup(&path);
2797    }
2798
2799    #[test]
2800    fn explicit_index_persists_across_save_and_open() {
2801        let path = tmp_path("idx_persist");
2802        let mut db = Database::new("idx".to_string());
2803        process_command(
2804            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2805            &mut db,
2806        )
2807        .unwrap();
2808        for i in 1..=5 {
2809            let tag = if i % 2 == 0 { "odd" } else { "even" };
2810            process_command(
2811                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2812                &mut db,
2813            )
2814            .unwrap();
2815        }
2816        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2817        save_database(&mut db, &path).unwrap();
2818
2819        let loaded = open_database(&path, "idx".to_string()).unwrap();
2820        let users = loaded.get_table("users".to_string()).unwrap();
2821        let idx = users
2822            .index_by_name("users_tag_idx")
2823            .expect("explicit index should survive save/open");
2824        assert_eq!(idx.column_name, "tag");
2825        assert!(!idx.is_unique);
2826        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2827        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2828        let even_rowids = idx.lookup(&Value::Text("even".into()));
2829        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2830        assert_eq!(even_rowids.len(), 3);
2831        assert_eq!(odd_rowids.len(), 2);
2832
2833        cleanup(&path);
2834    }
2835
2836    #[test]
2837    fn auto_indexes_for_unique_columns_survive_save_open() {
2838        let path = tmp_path("auto_idx_persist");
2839        let mut db = Database::new("a".to_string());
2840        process_command(
2841            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2842            &mut db,
2843        )
2844        .unwrap();
2845        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2846        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2847        save_database(&mut db, &path).unwrap();
2848
2849        let loaded = open_database(&path, "a".to_string()).unwrap();
2850        let users = loaded.get_table("users".to_string()).unwrap();
2851        // Every UNIQUE column auto-creates an index; the load path populated
2852        // it from the persisted entries.
2853        let auto_name = SecondaryIndex::auto_name("users", "email");
2854        let idx = users
2855            .index_by_name(&auto_name)
2856            .expect("auto index should be restored");
2857        assert!(idx.is_unique);
2858        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2859        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2860
2861        cleanup(&path);
2862    }
2863
2864    /// SQLR-1 — `CREATE INDEX` on a wide table must round-trip when the
2865    /// index B-tree grows past one leaf and needs an interior level.
2866    /// Before the fix, the post-DDL auto-save panicked with
2867    /// `Internal("unknown paged-entry kind tag 0x4 …")` because a
2868    /// table-cell decoder was being run against an index leaf
2869    /// (`KIND_INDEX = 0x04`).
2870    ///
2871    /// 5 000 rows mirror the original repro from the issue and exceed
2872    /// every leaf-fanout cliff for the small `(rowid, value)` cells in
2873    /// a TEXT-keyed secondary index.
2874    #[test]
2875    fn secondary_index_with_interior_level_round_trips() {
2876        let path = tmp_path("sqlr1_wide_index");
2877        let mut db = Database::new("idx".to_string());
2878        db.source_path = Some(path.clone());
2879
2880        process_command(
2881            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2882            &mut db,
2883        )
2884        .unwrap();
2885        // BEGIN/COMMIT collapses 5 000 inserts into one save (matches
2886        // `auto_vacuum_setup` and the issue's repro shape).
2887        process_command("BEGIN;", &mut db).unwrap();
2888        for i in 0..5000 {
2889            process_command(
2890                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2891                &mut db,
2892            )
2893            .unwrap();
2894        }
2895        process_command("COMMIT;", &mut db).unwrap();
2896
2897        // The DDL that used to panic.
2898        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2899
2900        // Reopen and verify lookups, plus that the index tree actually
2901        // grew an interior layer (otherwise this test wouldn't cover the
2902        // regression).
2903        drop(db);
2904        let loaded = open_database(&path, "idx".to_string()).unwrap();
2905        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2906        let idx = bloat
2907            .index_by_name("idx_p")
2908            .expect("idx_p should survive close/reopen");
2909        assert!(!idx.is_unique);
2910
2911        // Spot-check the keyspace: first, middle, last value each map
2912        // back to exactly the row that carried them.
2913        for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
2914            let value = Value::Text(format!("p-{probe_i:08}"));
2915            let hits = idx.lookup(&value);
2916            assert_eq!(
2917                hits,
2918                vec![expected_rowid],
2919                "lookup({value:?}) should yield rowid {expected_rowid}",
2920            );
2921        }
2922
2923        // Confirm the index tree is multi-level (the regression's
2924        // necessary condition) — root must be an `InteriorNode` and
2925        // `find_leftmost_leaf` must reach a `TableLeaf` through it.
2926        let pager = loaded.pager.as_ref().unwrap();
2927        let mut master = build_empty_master_table();
2928        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2929        let idx_root = master
2930            .rowids()
2931            .into_iter()
2932            .find_map(
2933                |r| match (master.get_value("name", r), master.get_value("type", r)) {
2934                    (Some(Value::Text(name)), Some(Value::Text(kind)))
2935                        if name == "idx_p" && kind == "index" =>
2936                    {
2937                        match master.get_value("rootpage", r) {
2938                            Some(Value::Integer(p)) => Some(p as u32),
2939                            _ => None,
2940                        }
2941                    }
2942                    _ => None,
2943                },
2944            )
2945            .expect("idx_p should appear in sqlrite_master");
2946        let root_buf = pager.read_page(idx_root).unwrap();
2947        assert_eq!(
2948            root_buf[0],
2949            PageType::InteriorNode as u8,
2950            "5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
2951        );
2952        let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
2953        let leaf_buf = pager.read_page(leaf).unwrap();
2954        assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
2955
2956        cleanup(&path);
2957    }
2958
2959    /// SQLR-1 follow-on — the page-recycling path between two large
2960    /// versions of the same index name must not corrupt cell decoding.
2961    /// `DROP INDEX` returns its pages to the freelist; the next
2962    /// `CREATE INDEX` is free to reuse them. If the allocator hands an
2963    /// old index leaf to a *table* without zeroing it, an upstream
2964    /// table walk would see KIND_INDEX cells and panic.
2965    #[test]
2966    fn drop_then_recreate_wide_index_does_not_panic() {
2967        let path = tmp_path("sqlr1_drop_recreate");
2968        let mut db = Database::new("idx".to_string());
2969        db.source_path = Some(path.clone());
2970
2971        process_command(
2972            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2973            &mut db,
2974        )
2975        .unwrap();
2976        process_command("BEGIN;", &mut db).unwrap();
2977        for i in 0..5000 {
2978            process_command(
2979                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2980                &mut db,
2981            )
2982            .unwrap();
2983        }
2984        process_command("COMMIT;", &mut db).unwrap();
2985
2986        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2987        process_command("DROP INDEX idx_p;", &mut db).unwrap();
2988        // Recreate from scratch — exercises the recycle path.
2989        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2990
2991        drop(db);
2992        let loaded = open_database(&path, "idx".to_string()).unwrap();
2993        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2994        let idx = bloat
2995            .index_by_name("idx_p")
2996            .expect("idx_p should survive drop+recreate+reopen");
2997        assert_eq!(
2998            idx.lookup(&Value::Text("p-00002500".into())),
2999            vec![2501],
3000            "post-recycle lookup must still resolve correctly",
3001        );
3002
3003        cleanup(&path);
3004    }
3005
3006    #[test]
3007    fn deep_tree_round_trips() {
3008        // Force a 3-level tree by bypassing process_command (which prints
3009        // the full table on every INSERT, making large bulk loads O(N^2)
3010        // in I/O). We build the Table directly via restore_row.
3011        use crate::sql::db::table::Column as TableColumn;
3012
3013        let path = tmp_path("deep_tree");
3014        let mut db = Database::new("deep".to_string());
3015        let columns = vec![
3016            TableColumn::new("id".into(), "integer".into(), true, true, true),
3017            TableColumn::new("s".into(), "text".into(), false, true, false),
3018        ];
3019        let mut table = build_empty_table("t", columns, 0);
3020        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
3021        // which with interior fanout ~400 needs 2 interior levels (3-level
3022        // tree total, counting leaves).
3023        for i in 1..=6_000i64 {
3024            let body = "q".repeat(900);
3025            table
3026                .restore_row(
3027                    i,
3028                    vec![
3029                        Some(Value::Integer(i)),
3030                        Some(Value::Text(format!("r-{i}-{body}"))),
3031                    ],
3032                )
3033                .unwrap();
3034        }
3035        db.tables.insert("t".to_string(), table);
3036        save_database(&mut db, &path).unwrap();
3037
3038        let loaded = open_database(&path, "deep".to_string()).unwrap();
3039        let t = loaded.get_table("t".to_string()).unwrap();
3040        assert_eq!(t.rowids().len(), 6_000);
3041
3042        // Confirm the tree actually grew past 2 levels — i.e., the root's
3043        // leftmost child is itself an interior page, not a leaf.
3044        let pager = loaded.pager.as_ref().unwrap();
3045        let mut master = build_empty_master_table();
3046        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
3047        let t_root = master
3048            .rowids()
3049            .into_iter()
3050            .find_map(|r| match master.get_value("name", r) {
3051                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
3052                    Some(Value::Integer(p)) => Some(p as u32),
3053                    _ => None,
3054                },
3055                _ => None,
3056            })
3057            .expect("t in sqlrite_master");
3058        let root_buf = pager.read_page(t_root).unwrap();
3059        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
3060        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
3061            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
3062        let root_interior = InteriorPage::from_bytes(root_payload);
3063        let child = root_interior.leftmost_child().unwrap();
3064        let child_buf = pager.read_page(child).unwrap();
3065        assert_eq!(
3066            child_buf[0],
3067            PageType::InteriorNode as u8,
3068            "expected 3-level tree: root's leftmost child should also be InteriorNode",
3069        );
3070
3071        cleanup(&path);
3072    }
3073
3074    #[test]
3075    fn alter_rename_table_survives_save_and_reopen() {
3076        let path = tmp_path("alter_rename_table_roundtrip");
3077        let mut db = seed_db();
3078        save_database(&mut db, &path).expect("save");
3079
3080        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
3081        save_database(&mut db, &path).expect("save after rename");
3082
3083        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3084        assert!(!loaded.contains_table("users".to_string()));
3085        assert!(loaded.contains_table("members".to_string()));
3086        let members = loaded.get_table("members".to_string()).unwrap();
3087        assert_eq!(members.rowids().len(), 2, "rows should survive");
3088        // Auto-indexes followed the rename.
3089        assert!(
3090            members
3091                .index_by_name("sqlrite_autoindex_members_id")
3092                .is_some()
3093        );
3094        assert!(
3095            members
3096                .index_by_name("sqlrite_autoindex_members_name")
3097                .is_some()
3098        );
3099
3100        cleanup(&path);
3101    }
3102
3103    #[test]
3104    fn alter_rename_column_survives_save_and_reopen() {
3105        let path = tmp_path("alter_rename_col_roundtrip");
3106        let mut db = seed_db();
3107        save_database(&mut db, &path).expect("save");
3108
3109        process_command(
3110            "ALTER TABLE users RENAME COLUMN name TO full_name;",
3111            &mut db,
3112        )
3113        .expect("rename column");
3114        save_database(&mut db, &path).expect("save after rename");
3115
3116        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3117        let users = loaded.get_table("users".to_string()).unwrap();
3118        assert!(users.contains_column("full_name".to_string()));
3119        assert!(!users.contains_column("name".to_string()));
3120        // Verify a row's value survived the rename round-trip.
3121        let alice_rowid = users
3122            .rowids()
3123            .into_iter()
3124            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
3125            .expect("alice row should be findable under renamed column");
3126        assert_eq!(
3127            users.get_value("full_name", alice_rowid),
3128            Some(Value::Text("alice".to_string()))
3129        );
3130
3131        cleanup(&path);
3132    }
3133
3134    #[test]
3135    fn alter_add_column_with_default_survives_save_and_reopen() {
3136        let path = tmp_path("alter_add_default_roundtrip");
3137        let mut db = seed_db();
3138        save_database(&mut db, &path).expect("save");
3139
3140        process_command(
3141            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
3142            &mut db,
3143        )
3144        .expect("add column");
3145        save_database(&mut db, &path).expect("save after add");
3146
3147        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3148        let users = loaded.get_table("users".to_string()).unwrap();
3149        assert!(users.contains_column("status".to_string()));
3150        for rowid in users.rowids() {
3151            assert_eq!(
3152                users.get_value("status", rowid),
3153                Some(Value::Text("active".to_string())),
3154                "backfilled default should round-trip for rowid {rowid}"
3155            );
3156        }
3157        // The DEFAULT clause itself should still be on the column metadata
3158        // so a subsequent INSERT picks it up.
3159        let status_col = users
3160            .columns
3161            .iter()
3162            .find(|c| c.column_name == "status")
3163            .unwrap();
3164        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
3165
3166        cleanup(&path);
3167    }
3168
3169    #[test]
3170    fn alter_drop_column_survives_save_and_reopen() {
3171        let path = tmp_path("alter_drop_col_roundtrip");
3172        let mut db = seed_db();
3173        save_database(&mut db, &path).expect("save");
3174
3175        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
3176        save_database(&mut db, &path).expect("save after drop");
3177
3178        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3179        let users = loaded.get_table("users".to_string()).unwrap();
3180        assert!(!users.contains_column("age".to_string()));
3181        assert!(users.contains_column("name".to_string()));
3182
3183        cleanup(&path);
3184    }
3185
3186    #[test]
3187    fn drop_table_survives_save_and_reopen() {
3188        let path = tmp_path("drop_table_roundtrip");
3189        let mut db = seed_db();
3190        save_database(&mut db, &path).expect("save");
3191
3192        // Verify both tables landed.
3193        {
3194            let loaded = open_database(&path, "t".to_string()).expect("open");
3195            assert!(loaded.contains_table("users".to_string()));
3196            assert!(loaded.contains_table("notes".to_string()));
3197        }
3198
3199        process_command("DROP TABLE users;", &mut db).expect("drop users");
3200        save_database(&mut db, &path).expect("save after drop");
3201
3202        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3203        assert!(
3204            !loaded.contains_table("users".to_string()),
3205            "dropped table should not resurface on reopen"
3206        );
3207        assert!(
3208            loaded.contains_table("notes".to_string()),
3209            "untouched table should survive"
3210        );
3211
3212        cleanup(&path);
3213    }
3214
3215    #[test]
3216    fn drop_index_survives_save_and_reopen() {
3217        let path = tmp_path("drop_index_roundtrip");
3218        let mut db = Database::new("t".to_string());
3219        process_command(
3220            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3221            &mut db,
3222        )
3223        .unwrap();
3224        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
3225        save_database(&mut db, &path).expect("save");
3226
3227        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
3228        save_database(&mut db, &path).expect("save after drop");
3229
3230        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3231        let notes = loaded.get_table("notes".to_string()).unwrap();
3232        assert!(
3233            notes.index_by_name("notes_body_idx").is_none(),
3234            "dropped index should not resurface on reopen"
3235        );
3236        // The auto-index for the PK should still be there.
3237        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
3238
3239        cleanup(&path);
3240    }
3241
3242    #[test]
3243    fn default_clause_survives_save_and_reopen() {
3244        let path = tmp_path("default_roundtrip");
3245        let mut db = Database::new("t".to_string());
3246
3247        process_command(
3248            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
3249            &mut db,
3250        )
3251        .unwrap();
3252        save_database(&mut db, &path).expect("save");
3253
3254        let mut loaded = open_database(&path, "t".to_string()).expect("open");
3255
3256        // The reloaded column metadata should still carry the DEFAULT.
3257        let users = loaded.get_table("users".to_string()).expect("users table");
3258        let status_col = users
3259            .columns
3260            .iter()
3261            .find(|c| c.column_name == "status")
3262            .expect("status column");
3263        assert_eq!(
3264            status_col.default,
3265            Some(Value::Text("active".to_string())),
3266            "DEFAULT 'active' should round-trip"
3267        );
3268        let score_col = users
3269            .columns
3270            .iter()
3271            .find(|c| c.column_name == "score")
3272            .expect("score column");
3273        assert_eq!(
3274            score_col.default,
3275            Some(Value::Integer(0)),
3276            "DEFAULT 0 should round-trip"
3277        );
3278
3279        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3280        // columns should pick them up from the reloaded schema.
3281        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
3282        let users = loaded.get_table("users".to_string()).unwrap();
3283        assert_eq!(
3284            users.get_value("status", 1),
3285            Some(Value::Text("active".to_string()))
3286        );
3287        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
3288
3289        cleanup(&path);
3290    }
3291
3292    // ---------------------------------------------------------------------
3293    // SQLR-6 — free-list + VACUUM tests
3294    // ---------------------------------------------------------------------
3295
3296    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3297    /// rather than extending the file. The page_count after drop+create
3298    /// should be at most what it was after the original two tables —
3299    /// proving the new table landed on freelist pages.
3300    #[test]
3301    fn drop_table_freelist_persists_pages_for_reuse() {
3302        let path = tmp_path("freelist_reuse");
3303        let mut db = seed_db();
3304        db.source_path = Some(path.clone());
3305        save_database(&mut db, &path).expect("save");
3306        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3307
3308        // Drop one table; its pages go on the freelist.
3309        process_command("DROP TABLE users;", &mut db).expect("drop users");
3310        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
3311        assert_eq!(
3312            pages_after_drop, pages_two_tables,
3313            "page_count should not shrink on drop — the freed pages persist on the freelist"
3314        );
3315        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
3316        assert!(
3317            head_after_drop != 0,
3318            "freelist_head must be non-zero after drop"
3319        );
3320
3321        // Re-create a similar-shaped table; should reuse freelist pages.
3322        process_command(
3323            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3324            &mut db,
3325        )
3326        .expect("create accounts");
3327        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
3328        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
3329        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3330        assert!(
3331            pages_after_create <= pages_two_tables + 2,
3332            "creating a similar-sized table after a drop should mostly draw from the \
3333             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3334        );
3335
3336        cleanup(&path);
3337    }
3338
3339    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3340    #[test]
3341    fn drop_then_vacuum_shrinks_file() {
3342        let path = tmp_path("vacuum_shrinks");
3343        let mut db = seed_db();
3344        db.source_path = Some(path.clone());
3345        // Add a few more rows to make the dropped table bigger.
3346        for i in 0..20 {
3347            process_command(
3348                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
3349                &mut db,
3350            )
3351            .unwrap();
3352        }
3353        save_database(&mut db, &path).expect("save");
3354
3355        process_command("DROP TABLE users;", &mut db).expect("drop");
3356        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
3357        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
3358        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
3359        assert!(head_before != 0, "drop should populate the freelist");
3360
3361        // VACUUM (via process_command) checkpoints internally so the
3362        // file actually shrinks on disk before we observe its size.
3363        process_command("VACUUM;", &mut db).expect("vacuum");
3364
3365        let size_after = std::fs::metadata(&path).unwrap().len();
3366        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3367        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3368        assert!(
3369            pages_after < pages_before_vacuum,
3370            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3371        );
3372        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
3373        assert!(
3374            size_after < size_before_vacuum,
3375            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3376        );
3377
3378        cleanup(&path);
3379    }
3380
3381    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3382    #[test]
3383    fn vacuum_round_trips_data() {
3384        let path = tmp_path("vacuum_round_trip");
3385        let mut db = seed_db();
3386        db.source_path = Some(path.clone());
3387        save_database(&mut db, &path).expect("save");
3388        process_command("VACUUM;", &mut db).expect("vacuum");
3389
3390        // Re-open from disk to make sure the on-disk catalog round-trips.
3391        drop(db);
3392        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
3393        assert!(loaded.contains_table("users".to_string()));
3394        assert!(loaded.contains_table("notes".to_string()));
3395        let users = loaded.get_table("users".to_string()).unwrap();
3396        // seed_db inserts two users.
3397        assert_eq!(users.rowids().len(), 2);
3398
3399        cleanup(&path);
3400    }
3401
3402    /// Format version is bumped to v6 only after a save that creates a
3403    /// non-empty freelist. VACUUM clears the freelist but doesn't
3404    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3405    #[test]
3406    fn freelist_format_version_promotion() {
3407        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3408        let path = tmp_path("v6_promotion");
3409        let mut db = seed_db();
3410        db.source_path = Some(path.clone());
3411        save_database(&mut db, &path).expect("save");
3412        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
3413        assert_eq!(
3414            v_after_save, FORMAT_VERSION_BASELINE,
3415            "fresh DB without drops should stay at the baseline version"
3416        );
3417
3418        process_command("DROP TABLE users;", &mut db).expect("drop");
3419        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
3420        assert_eq!(
3421            v_after_drop, FORMAT_VERSION_V6,
3422            "first save with a non-empty freelist must promote to V6"
3423        );
3424
3425        process_command("VACUUM;", &mut db).expect("vacuum");
3426        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
3427        assert_eq!(
3428            v_after_vacuum, FORMAT_VERSION_V6,
3429            "VACUUM must not downgrade — V6 is a strict superset"
3430        );
3431
3432        cleanup(&path);
3433    }
3434
3435    /// Freelist persists across reopen: drop, save, close, reopen,
3436    /// confirm the next CREATE TABLE re-uses pages from the persisted
3437    /// freelist (rather than extending the file).
3438    #[test]
3439    fn freelist_round_trip_through_reopen() {
3440        let path = tmp_path("freelist_reopen");
3441        let pages_two_tables;
3442        {
3443            let mut db = seed_db();
3444            db.source_path = Some(path.clone());
3445            save_database(&mut db, &path).expect("save");
3446            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3447            process_command("DROP TABLE users;", &mut db).expect("drop");
3448            let head = db.pager.as_ref().unwrap().header().freelist_head;
3449            assert!(head != 0, "drop must populate the freelist");
3450        }
3451
3452        // Reopen from disk — the freelist must come back.
3453        let mut db = open_database(&path, "t".to_string()).expect("reopen");
3454        assert!(
3455            db.pager.as_ref().unwrap().header().freelist_head != 0,
3456            "freelist_head must survive close/reopen"
3457        );
3458
3459        process_command(
3460            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3461            &mut db,
3462        )
3463        .expect("create accounts");
3464        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
3465        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3466        assert!(
3467            pages_after_create <= pages_two_tables + 2,
3468            "post-reopen create should reuse freelist (got {pages_after_create} > \
3469             {pages_two_tables} + 2 — file extended instead of reusing)"
3470        );
3471
3472        cleanup(&path);
3473    }
3474
3475    /// VACUUM inside an explicit transaction must error before touching the
3476    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3477    #[test]
3478    fn vacuum_inside_transaction_is_rejected() {
3479        let path = tmp_path("vacuum_txn");
3480        let mut db = seed_db();
3481        db.source_path = Some(path.clone());
3482        save_database(&mut db, &path).expect("save");
3483
3484        process_command("BEGIN;", &mut db).expect("begin");
3485        let err = process_command("VACUUM;", &mut db).unwrap_err();
3486        assert!(
3487            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3488            "expected in-transaction rejection, got: {err}"
3489        );
3490        // Roll back to leave the DB in a clean state.
3491        process_command("ROLLBACK;", &mut db).unwrap();
3492        cleanup(&path);
3493    }
3494
3495    /// VACUUM on an in-memory database is a documented no-op.
3496    #[test]
3497    fn vacuum_on_in_memory_database_is_noop() {
3498        let mut db = Database::new("mem".to_string());
3499        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
3500        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
3501        assert!(
3502            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
3503            "expected no-op message for in-memory VACUUM, got: {out}"
3504        );
3505    }
3506
3507    /// Untouched tables shouldn't write any pages on the save that
3508    /// follows a DROP of an unrelated table. Confirms the per-table
3509    /// preferred pool keeps page numbers stable so the diff pager skips
3510    /// every byte-identical leaf.
3511    #[test]
3512    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3513        // Need three tables so dropping one in the middle still leaves
3514        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3515        //   accounts, notes, users
3516        // Drop `notes`. `accounts` and `users` should keep their pages.
3517        let path = tmp_path("diff_after_drop");
3518        let mut db = Database::new("t".to_string());
3519        db.source_path = Some(path.clone());
3520        process_command(
3521            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3522            &mut db,
3523        )
3524        .unwrap();
3525        process_command(
3526            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3527            &mut db,
3528        )
3529        .unwrap();
3530        process_command(
3531            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3532            &mut db,
3533        )
3534        .unwrap();
3535        for i in 0..5 {
3536            process_command(
3537                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
3538                &mut db,
3539            )
3540            .unwrap();
3541            process_command(
3542                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
3543                &mut db,
3544            )
3545            .unwrap();
3546            process_command(
3547                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
3548                &mut db,
3549            )
3550            .unwrap();
3551        }
3552        save_database(&mut db, &path).expect("baseline save");
3553
3554        // Capture page bytes for `accounts` and `users` so we can
3555        // verify they don't change.
3556        let pager = db.pager.as_ref().unwrap();
3557        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
3558            .unwrap()
3559            .get(&("table".to_string(), "accounts".to_string()))
3560            .copied()
3561            .unwrap();
3562        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
3563            .unwrap()
3564            .get(&("table".to_string(), "users".to_string()))
3565            .copied()
3566            .unwrap();
3567        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
3568        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
3569
3570        // Drop the middle table.
3571        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
3572
3573        let pager = db.pager.as_ref().unwrap();
3574        // `accounts` and `users` should still live at the same pages
3575        // with byte-identical content.
3576        let acc_after = pager.read_page(acc_root).unwrap();
3577        let users_after = pager.read_page(users_root).unwrap();
3578        assert_eq!(
3579            &acc_after[..],
3580            &acc_bytes_before[..],
3581            "accounts root page must not be rewritten when an unrelated table is dropped"
3582        );
3583        assert_eq!(
3584            &users_after[..],
3585            &users_bytes_before[..],
3586            "users root page must not be rewritten when an unrelated table is dropped"
3587        );
3588
3589        cleanup(&path);
3590    }
3591
3592    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3593
3594    /// Builds a file-backed DB with one small "keep" table and one
3595    /// large "bloat" table, sized so the post-drop freelist will
3596    /// comfortably cross the default 25% threshold and the
3597    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3598    /// auto-VACUUM happy-path tests.
3599    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
3600        let mut db = Database::new("av".to_string());
3601        db.source_path = Some(path.to_path_buf());
3602        process_command(
3603            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3604            &mut db,
3605        )
3606        .unwrap();
3607        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
3608        process_command(
3609            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3610            &mut db,
3611        )
3612        .unwrap();
3613        // Wrap the bulk insert in a transaction so we pay one save at
3614        // COMMIT instead of 5000 round-trips through auto-save.
3615        process_command("BEGIN;", &mut db).unwrap();
3616        for i in 0..5000 {
3617            process_command(
3618                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
3619                &mut db,
3620            )
3621            .unwrap();
3622        }
3623        process_command("COMMIT;", &mut db).unwrap();
3624        db
3625    }
3626
3627    /// Default threshold (0.25) is engaged for fresh `Database`s and
3628    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3629    /// without anyone calling `VACUUM;`.
3630    #[test]
3631    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3632        let path = tmp_path("av_default_drop_table");
3633        let mut db = auto_vacuum_setup(&path);
3634        // Sanity: setup respects the shipped default.
3635        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3636
3637        // Checkpoint before measuring `size_before` so the bloat actually
3638        // lives in the main file and not just the WAL — otherwise
3639        // `size_before` is the bare 2-page header and any post-vacuum
3640        // checkpoint will look like the file *grew*.
3641        if let Some(p) = db.pager.as_mut() {
3642            let _ = p.checkpoint();
3643        }
3644        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3645        let size_before = std::fs::metadata(&path).unwrap().len();
3646        assert!(
3647            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
3648            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3649             pages so the floor doesn't suppress the trigger; got {pages_before}"
3650        );
3651
3652        // Drop the bloat table — freelist should pass 25% of page_count
3653        // and the auto-VACUUM hook should compact in place. Note: no
3654        // explicit `VACUUM;` statement is issued.
3655        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3656
3657        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3658        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3659        // Second checkpoint so the post-vacuum file shrinks on disk
3660        // (auto-VACUUM stages the compact through WAL just like manual
3661        // VACUUM does).
3662        if let Some(p) = db.pager.as_mut() {
3663            let _ = p.checkpoint();
3664        }
3665        let size_after = std::fs::metadata(&path).unwrap().len();
3666
3667        assert!(
3668            pages_after < pages_before,
3669            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3670        );
3671        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
3672        assert!(
3673            size_after < size_before,
3674            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3675        );
3676
3677        cleanup(&path);
3678    }
3679
3680    /// Setting the threshold to `None` disables the trigger entirely:
3681    /// the same workload that shrinks under the default leaves the file
3682    /// at its high-water mark.
3683    #[test]
3684    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3685        let path = tmp_path("av_disabled");
3686        let mut db = auto_vacuum_setup(&path);
3687        db.set_auto_vacuum_threshold(None).expect("disable");
3688        assert_eq!(db.auto_vacuum_threshold(), None);
3689
3690        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3691
3692        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3693
3694        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3695        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3696        assert_eq!(
3697            pages_after, pages_before,
3698            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3699        );
3700        assert!(
3701            head_after != 0,
3702            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3703        );
3704
3705        cleanup(&path);
3706    }
3707
3708    /// `DROP INDEX` is the second of three page-releasing DDL paths
3709    /// covered by SQLR-10. We bloat the freelist via a separate
3710    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3711    /// compact early), then re-arm the trigger and drop a small index
3712    /// — the cumulative freelist crosses 25% on the index drop and
3713    /// auto-VACUUM fires.
3714    ///
3715    /// The detour around bloat is necessary because building a
3716    /// secondary index on a 5000-row column would need multi-level
3717    /// interior nodes, and the cell-decoder's interior-page support
3718    /// is a separate work item from SQLR-10.
3719    #[test]
3720    fn auto_vacuum_triggers_on_drop_index() {
3721        let path = tmp_path("av_drop_index");
3722        let mut db = auto_vacuum_setup(&path);
3723
3724        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3725        // its pages land on the freelist without being reclaimed.
3726        db.set_auto_vacuum_threshold(None).expect("disable");
3727        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
3728        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
3729        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
3730        assert!(
3731            head_after_bloat_drop != 0,
3732            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3733        );
3734
3735        // Phase 2: a small index on the surviving `keep` table. The
3736        // index reuses one page from the freelist (which is fine —
3737        // freelist still holds plenty more).
3738        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
3739
3740        // Phase 3: re-arm the trigger and drop the index. The freelist
3741        // is already heavily populated from phase 1; this drop just
3742        // adds the index page on top, keeping the ratio well above
3743        // 25%, so auto-VACUUM should fire.
3744        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
3745        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
3746
3747        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3748        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3749        assert!(
3750            pages_after < pages_after_bloat_drop,
3751            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3752             was {pages_after_bloat_drop}, now {pages_after}"
3753        );
3754        assert_eq!(
3755            head_after, 0,
3756            "auto-VACUUM after DROP INDEX must clear the freelist"
3757        );
3758
3759        cleanup(&path);
3760    }
3761
3762    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3763    /// the SQLR-10 trigger covers.
3764    #[test]
3765    fn auto_vacuum_triggers_on_alter_drop_column() {
3766        let path = tmp_path("av_alter_drop_col");
3767        let mut db = auto_vacuum_setup(&path);
3768        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3769
3770        // Drop the wide `payload` column — this rewrites every row in
3771        // `bloat` without the column, so the old leaf pages get freed.
3772        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
3773
3774        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3775        assert!(
3776            pages_after < pages_before,
3777            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3778             was {pages_before}, now {pages_after}"
3779        );
3780        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
3781
3782        cleanup(&path);
3783    }
3784
3785    /// A high threshold (0.99) suppresses the trigger when the freelist
3786    /// ratio is well below it — the file stays at HWM.
3787    #[test]
3788    fn auto_vacuum_skips_below_threshold() {
3789        let path = tmp_path("av_below_threshold");
3790        let mut db = auto_vacuum_setup(&path);
3791        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
3792
3793        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3794
3795        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3796
3797        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3798        assert_eq!(
3799            pages_after, pages_before,
3800            "freelist ratio after a single drop is far below 0.99 — \
3801             page_count must stay at the HWM"
3802        );
3803        assert!(
3804            db.pager.as_ref().unwrap().header().freelist_head != 0,
3805            "drop must still populate the freelist"
3806        );
3807
3808        cleanup(&path);
3809    }
3810
3811    /// Inside an explicit transaction, the page-releasing DDL doesn't
3812    /// flush to disk yet — the freelist isn't accurate, so the trigger
3813    /// must skip. The compact would also publish in-flight work out of
3814    /// band, which is exactly what the manual `VACUUM;` rejection
3815    /// inside a txn already prevents.
3816    #[test]
3817    fn auto_vacuum_skips_inside_transaction() {
3818        let path = tmp_path("av_in_txn");
3819        let mut db = auto_vacuum_setup(&path);
3820        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3821
3822        process_command("BEGIN;", &mut db).expect("begin");
3823        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
3824        // Mid-transaction: no save has occurred, so the on-disk
3825        // freelist_head must be unchanged and page_count must not have
3826        // shifted from a sneaky compact.
3827        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
3828        assert_eq!(
3829            pages_mid, pages_before,
3830            "auto-VACUUM must not fire mid-transaction"
3831        );
3832
3833        process_command("ROLLBACK;", &mut db).expect("rollback");
3834        cleanup(&path);
3835    }
3836
3837    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3838    /// trigger even if the ratio would otherwise qualify — the cost of
3839    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3840    #[test]
3841    fn auto_vacuum_skips_under_min_pages_floor() {
3842        let path = tmp_path("av_under_floor");
3843        let mut db = seed_db(); // small: just users + notes, ~5 pages
3844        db.source_path = Some(path.clone());
3845        save_database(&mut db, &path).expect("save");
3846        // Confirm we're below the floor so the test is meaningful.
3847        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3848        assert!(
3849            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
3850            "test setup is too large: floor would not apply (got {pages_before} pages, \
3851             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3852        );
3853
3854        process_command("DROP TABLE users;", &mut db).expect("drop");
3855
3856        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3857        assert_eq!(
3858            pages_after, pages_before,
3859            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3860        );
3861        assert!(
3862            db.pager.as_ref().unwrap().header().freelist_head != 0,
3863            "drop must still populate the freelist normally"
3864        );
3865
3866        cleanup(&path);
3867    }
3868
3869    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3870    /// rather than silently saturating.
3871    #[test]
3872    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3873        let mut db = Database::new("t".to_string());
3874        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3875            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
3876            assert!(
3877                format!("{err}").contains("auto_vacuum_threshold"),
3878                "expected a typed range error for {bad}, got: {err}"
3879            );
3880        }
3881        // The default survives the rejected sets unchanged.
3882        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3883        // And valid values land.
3884        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
3885        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
3886        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
3887        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
3888        db.set_auto_vacuum_threshold(None).unwrap();
3889        assert_eq!(db.auto_vacuum_threshold(), None);
3890    }
3891
3892    // ---------------------------------------------------------------
3893    // SQLR-13 — `PRAGMA auto_vacuum` SQL-level coverage. Mirrors the
3894    // SQLR-10 setter tests above, but routed through SQL so SDK / FFI
3895    // / MCP consumers (which can't reach the Rust setter directly)
3896    // get the same guarantees.
3897    // ---------------------------------------------------------------
3898
3899    /// `PRAGMA auto_vacuum = N;` set + `PRAGMA auto_vacuum;` read
3900    /// round-trip the threshold, observable via `auto_vacuum_threshold`.
3901    #[test]
3902    fn pragma_auto_vacuum_set_and_read_via_sql() {
3903        let mut db = Database::new("t".to_string());
3904
3905        let resp = process_command("PRAGMA auto_vacuum = 0.5;", &mut db).expect("set");
3906        assert!(
3907            resp.contains("PRAGMA"),
3908            "set form should produce a PRAGMA status, got: {resp}"
3909        );
3910        assert_eq!(db.auto_vacuum_threshold(), Some(0.5));
3911
3912        // Read form — status mentions a returned row.
3913        let resp = process_command("PRAGMA auto_vacuum;", &mut db).expect("read");
3914        assert!(resp.contains("1 row"), "expected a 1-row read, got: {resp}");
3915    }
3916
3917    /// `PRAGMA auto_vacuum = OFF;` (bare identifier — sqlparser's own
3918    /// pragma-value parser would reject this, the SQLR-13 dispatcher
3919    /// must accept it) and `= NONE;` both disable the trigger. So does
3920    /// the quoted form `'OFF'`.
3921    #[test]
3922    fn pragma_auto_vacuum_off_disables_trigger() {
3923        for raw in ["OFF", "off", "NONE", "none", "'OFF'", "'NONE'"] {
3924            let mut db = Database::new("t".to_string());
3925            assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3926
3927            let stmt = format!("PRAGMA auto_vacuum = {raw};");
3928            process_command(&stmt, &mut db)
3929                .unwrap_or_else(|e| panic!("`{stmt}` should disable: {e}"));
3930            assert_eq!(
3931                db.auto_vacuum_threshold(),
3932                None,
3933                "`{stmt}` should clear the threshold"
3934            );
3935        }
3936    }
3937
3938    /// Out-of-range numeric values surface as a typed error via the
3939    /// shared `set_auto_vacuum_threshold` validator — no silent
3940    /// saturation. Mirrors the SQLR-10 setter coverage.
3941    #[test]
3942    fn pragma_auto_vacuum_rejects_out_of_range_via_sql() {
3943        let mut db = Database::new("t".to_string());
3944        for bad in ["-0.01", "1.01", "1.5"] {
3945            let stmt = format!("PRAGMA auto_vacuum = {bad};");
3946            let err = process_command(&stmt, &mut db).unwrap_err();
3947            assert!(
3948                format!("{err}").contains("auto_vacuum_threshold"),
3949                "expected range error for `{stmt}`, got: {err}"
3950            );
3951        }
3952        // Default survives all the rejected sets.
3953        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3954    }
3955
3956    /// Junk strings (anything that isn't a number or `OFF`/`NONE`) are
3957    /// rejected at parse time with a typed error, not silently treated
3958    /// as "disable".
3959    #[test]
3960    fn pragma_auto_vacuum_rejects_unknown_strings_via_sql() {
3961        let mut db = Database::new("t".to_string());
3962        let err = process_command("PRAGMA auto_vacuum = WAL;", &mut db).unwrap_err();
3963        assert!(
3964            format!("{err}").contains("OFF/NONE"),
3965            "expected OFF/NONE-style error, got: {err}"
3966        );
3967        // Default unaffected.
3968        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3969    }
3970
3971    /// Pragmas SQLRite doesn't know about return `NotImplemented` —
3972    /// not a generic parser error. Future pragmas plug in here.
3973    #[test]
3974    fn pragma_unknown_returns_not_implemented() {
3975        let mut db = Database::new("t".to_string());
3976        let err = process_command("PRAGMA journal_mode = WAL;", &mut db).unwrap_err();
3977        assert!(
3978            matches!(err, SQLRiteError::NotImplemented(_)),
3979            "unknown pragma must surface NotImplemented, got: {err:?}"
3980        );
3981    }
3982
3983    /// Setting the threshold via SQL must produce identical behavior to
3984    /// the Rust setter on the actual auto-VACUUM trigger: `= 0.99`
3985    /// suppresses, `= OFF` disables, default fires. Sanity-checks that
3986    /// `process_command_with_render`'s pre-parse step doesn't desync
3987    /// the in-memory state from the file.
3988    #[test]
3989    fn pragma_auto_vacuum_drives_real_trigger() {
3990        // Sub-case A — `PRAGMA auto_vacuum = OFF;` keeps file at HWM.
3991        {
3992            let path = tmp_path("av_pragma_off");
3993            let mut db = auto_vacuum_setup(&path);
3994            process_command("PRAGMA auto_vacuum = OFF;", &mut db).expect("disable via PRAGMA");
3995            assert_eq!(db.auto_vacuum_threshold(), None);
3996
3997            let pages_before = db.pager.as_ref().unwrap().header().page_count;
3998            process_command("DROP TABLE bloat;", &mut db).expect("drop");
3999            let pages_after = db.pager.as_ref().unwrap().header().page_count;
4000            assert_eq!(
4001                pages_after, pages_before,
4002                "PRAGMA-driven OFF must keep page_count at the HWM"
4003            );
4004            cleanup(&path);
4005        }
4006
4007        // Sub-case B — high threshold via PRAGMA suppresses the
4008        // trigger on a single drop.
4009        {
4010            let path = tmp_path("av_pragma_high");
4011            let mut db = auto_vacuum_setup(&path);
4012            process_command("PRAGMA auto_vacuum = 0.99;", &mut db).expect("set high");
4013            assert_eq!(db.auto_vacuum_threshold(), Some(0.99));
4014
4015            let pages_before = db.pager.as_ref().unwrap().header().page_count;
4016            process_command("DROP TABLE bloat;", &mut db).expect("drop");
4017            let pages_after = db.pager.as_ref().unwrap().header().page_count;
4018            assert_eq!(
4019                pages_after, pages_before,
4020                "high PRAGMA threshold must suppress the trigger"
4021            );
4022            cleanup(&path);
4023        }
4024
4025        // Sub-case C — re-arm via PRAGMA after disable: the trigger
4026        // fires again on the next page-releasing DDL.
4027        {
4028            let path = tmp_path("av_pragma_rearm");
4029            let mut db = auto_vacuum_setup(&path);
4030            process_command("PRAGMA auto_vacuum = OFF;", &mut db).unwrap();
4031            // Drop with the trigger off — pages land on the freelist
4032            // but the file stays at HWM.
4033            process_command("DROP TABLE bloat;", &mut db).unwrap();
4034            let pages_after_off_drop = db.pager.as_ref().unwrap().header().page_count;
4035            assert!(db.pager.as_ref().unwrap().header().freelist_head != 0);
4036
4037            // Re-arm via PRAGMA, then drop one more thing — the
4038            // accumulated freelist still exceeds 25%, so auto-VACUUM
4039            // fires.
4040            process_command("PRAGMA auto_vacuum = 0.25;", &mut db).expect("re-arm");
4041            process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).unwrap();
4042            process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
4043
4044            let pages_after_rearm = db.pager.as_ref().unwrap().header().page_count;
4045            assert!(
4046                pages_after_rearm < pages_after_off_drop,
4047                "re-armed PRAGMA must let auto-VACUUM fire: was {pages_after_off_drop}, \
4048                 now {pages_after_rearm}"
4049            );
4050            assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
4051            cleanup(&path);
4052        }
4053    }
4054
4055    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
4056    /// with NotImplemented — only bare `VACUUM;` is supported.
4057    #[test]
4058    fn vacuum_modifiers_are_rejected() {
4059        let path = tmp_path("vacuum_modifiers");
4060        let mut db = seed_db();
4061        db.source_path = Some(path.clone());
4062        save_database(&mut db, &path).expect("save");
4063        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
4064            let err = process_command(stmt, &mut db).unwrap_err();
4065            assert!(
4066                format!("{err}").contains("VACUUM modifiers"),
4067                "expected modifier rejection for `{stmt}`, got: {err}"
4068            );
4069        }
4070        cleanup(&path);
4071    }
4072}