Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod allocator;
34#[allow(dead_code)]
35pub mod cell;
36pub mod file;
37#[allow(dead_code)]
38pub mod freelist;
39#[allow(dead_code)]
40pub mod fts_cell;
41pub mod header;
42#[allow(dead_code)]
43pub mod hnsw_cell;
44#[allow(dead_code)]
45pub mod index_cell;
46#[allow(dead_code)]
47pub mod interior_page;
48pub mod overflow;
49pub mod page;
50pub mod pager;
51#[allow(dead_code)]
52pub mod table_page;
53#[allow(dead_code)]
54pub mod varint;
55#[allow(dead_code)]
56pub mod wal;
57
58use std::collections::{BTreeMap, HashMap};
59use std::path::Path;
60use std::sync::{Arc, Mutex};
61
62use sqlparser::dialect::SQLiteDialect;
63use sqlparser::parser::Parser;
64
65use crate::error::{Result, SQLRiteError};
66use crate::sql::db::database::Database;
67use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69use crate::sql::pager::cell::Cell;
70use crate::sql::pager::header::DbHeader;
71use crate::sql::pager::index_cell::IndexCell;
72use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
73use crate::sql::pager::overflow::{
74    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
75};
76use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
77use crate::sql::pager::pager::Pager;
78use crate::sql::pager::table_page::TablePage;
79use crate::sql::parser::create::CreateQuery;
80
81// Re-export so callers can spell `sql::pager::AccessMode` without
82// reaching into the `pager::pager::pager` submodule path.
83pub use crate::sql::pager::pager::AccessMode;
84
85/// Name of the internal catalog table. Reserved — user CREATEs of this
86/// name must be rejected upstream.
87pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
88
89/// Opens a database file in read-write mode. Shorthand for
90/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
91pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
92    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
93}
94
95/// Opens a database file in read-only mode. Acquires a shared OS-level
96/// advisory lock, so other read-only openers coexist but any writer is
97/// excluded. Attempts to mutate the returned `Database` (e.g. an
98/// `INSERT`, or a `save_database` call against it) bottom out in a
99/// `cannot commit: database is opened read-only` error from the Pager.
100pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
101    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
102}
103
104/// Opens a database file and reconstructs the in-memory `Database`,
105/// leaving the long-lived `Pager` attached for subsequent auto-save
106/// (read-write) or consistent-snapshot reads (read-only).
107pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
108    let pager = Pager::open_with_mode(path, mode)?;
109
110    // 1. Load sqlrite_master from the tree at header.schema_root_page.
111    let mut master = build_empty_master_table();
112    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
113
114    // 2. Two passes over master rows: first build every user table, then
115    //    attach secondary indexes. Indexes need their base table to exist
116    //    before we can populate them. Auto-indexes are created at table
117    //    build time so we only have to load explicit indexes from disk
118    //    (but we also reload the auto-index CONTENT because Table::new
119    //    built it empty).
120    let mut db = Database::new(db_name);
121    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
122
123    for rowid in master.rowids() {
124        let ty = take_text(&master, "type", rowid)?;
125        let name = take_text(&master, "name", rowid)?;
126        let sql = take_text(&master, "sql", rowid)?;
127        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
128        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
129
130        match ty.as_str() {
131            "table" => {
132                let (parsed_name, columns) = parse_create_sql(&sql)?;
133                if parsed_name != name {
134                    return Err(SQLRiteError::Internal(format!(
135                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
136                    )));
137                }
138                let mut table = build_empty_table(&name, columns, last_rowid);
139                if rootpage != 0 {
140                    load_table_rows(&pager, &mut table, rootpage)?;
141                }
142                if last_rowid > table.last_rowid {
143                    table.last_rowid = last_rowid;
144                }
145                db.tables.insert(name, table);
146            }
147            "index" => {
148                index_rows.push(IndexCatalogRow {
149                    name,
150                    sql,
151                    rootpage,
152                });
153            }
154            other => {
155                return Err(SQLRiteError::Internal(format!(
156                    "sqlrite_master row '{name}' has unknown type '{other}'"
157                )));
158            }
159        }
160    }
161
162    // Second pass: attach each index to its table. HNSW indexes
163    // (Phase 7d.2) take a different code path because their persisted
164    // form is just the CREATE INDEX SQL — the graph itself isn't
165    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
166    // and route to a graph-rebuild instead of the B-Tree-cell load.
167    //
168    // Phase 8b — same shape for FTS indexes. The posting lists aren't
169    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
170    // open and let `execute_create_index` walk current rows.
171    for row in index_rows {
172        if create_index_sql_uses_hnsw(&row.sql) {
173            rebuild_hnsw_index(&mut db, &pager, &row)?;
174        } else if create_index_sql_uses_fts(&row.sql) {
175            rebuild_fts_index(&mut db, &pager, &row)?;
176        } else {
177            attach_index(&mut db, &pager, row)?;
178        }
179    }
180
181    db.source_path = Some(path.to_path_buf());
182    db.pager = Some(pager);
183    Ok(db)
184}
185
186/// Catalog row for a secondary index — deferred until after every table is
187/// loaded so the index's base table exists by the time we populate it.
188struct IndexCatalogRow {
189    name: String,
190    sql: String,
191    rootpage: u32,
192}
193
194/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
195/// haven't changed; the [`PageAllocator`] preserves per-table page
196/// numbers across saves so unchanged tables produce zero dirty frames.
197///
198/// Pages that were live before this save but aren't restaged this round
199/// (e.g., the leaves of a dropped table) move onto a persisted free
200/// list rooted at `header.freelist_head`; subsequent saves draw from
201/// the freelist before extending the file. `VACUUM` (see
202/// [`vacuum_database`]) compacts the file by ignoring the freelist and
203/// allocating linearly from page 1.
204///
205/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
206pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
207    save_database_with_mode(db, path, /*compact=*/ false)
208}
209
210/// Reclaims space by rewriting every live B-Tree contiguously from
211/// page 1, with no freelist. Equivalent to `save_database` but ignores
212/// the existing freelist and per-table preferred pools — every page is
213/// allocated by extending the high-water mark — so the resulting file
214/// is tightly packed and the freelist is empty.
215///
216/// Used by the SQL-level `VACUUM;` statement.
217pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
218    save_database_with_mode(db, path, /*compact=*/ true)
219}
220
221/// Shared save core. `compact = false` is the normal save path (uses
222/// the existing freelist + per-table preferred pools). `compact = true`
223/// is the VACUUM path (empty freelist, empty preferred pools, linear
224/// allocation from page 1).
225fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
226    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
227    // marked dirty. Done up front under the &mut Database borrow we
228    // already hold, before the immutable iteration loops below need
229    // their own borrow.
230    rebuild_dirty_hnsw_indexes(db);
231    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
232    rebuild_dirty_fts_indexes(db);
233
234    let same_path = db.source_path.as_deref() == Some(path);
235    let mut pager = if same_path {
236        match db.pager.take() {
237            Some(p) => p,
238            None if path.exists() => Pager::open(path)?,
239            None => Pager::create(path)?,
240        }
241    } else if path.exists() {
242        Pager::open(path)?
243    } else {
244        Pager::create(path)?
245    };
246
247    // Snapshot what was live BEFORE we reset staged. Used to compute the
248    // newly-freed set after staging completes. Page 0 (the header) is
249    // never on the freelist — it's always live.
250    let old_header = pager.header();
251    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
252
253    // Read the previously-persisted freelist so its leaf pages can be
254    // reused as preferred allocations and its trunk pages don't leak.
255    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
256        (Vec::new(), Vec::new())
257    } else {
258        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
259    };
260
261    // Snapshot the previous rootpages of each table/index so we can
262    // seed per-table preferred pools (the unchanged-table case stages
263    // byte-identical pages → diff pager skips every write for it).
264    let old_rootpages = if compact {
265        HashMap::new()
266    } else {
267        read_old_rootpages(&pager, old_header.schema_root_page)?
268    };
269
270    // SQLR-1 — snapshot every prior B-Tree's page set NOW, before any
271    // staging starts. `Pager::read_page` shadows on-disk bytes with the
272    // current `staged` buffer, so if we deferred these walks until each
273    // object's turn in the staging loop, a *new* index added in this
274    // save would extend past the old high-water and overwrite the
275    // pages of any later-staged object whose old root sits in that
276    // range — including `sqlrite_master`, which is always staged last.
277    // The follow-up walk would then read the wrong B-Tree's bytes and
278    // either hand the allocator a bogus preferred pool or panic
279    // dispatching cells (a table-cell decoder vs. an index leaf, the
280    // shape of the original SQLR-1 panic). Walking up front pins each
281    // map to the committed bytes that were on disk before this save
282    // touched anything.
283    let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
284        HashMap::new()
285    } else {
286        let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
287        for ((kind, name), &root) in &old_rootpages {
288            // Tables can carry overflow chains; index/HNSW/FTS leaves
289            // never overflow in the current encoding, so the cheaper
290            // walk suffices for them.
291            let follow = kind == "table";
292            let pages = collect_pages_for_btree(&pager, root, follow)?;
293            map.insert((kind.clone(), name.clone()), pages);
294        }
295        map
296    };
297    let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
298        Vec::new()
299    } else {
300        collect_pages_for_btree(
301            &pager,
302            old_header.schema_root_page,
303            /*follow_overflow=*/ true,
304        )?
305    };
306
307    pager.clear_staged();
308
309    // Allocator: in normal mode, seed with the old freelist; in compact
310    // mode, start empty so allocation extends linearly from page 1.
311    use std::collections::VecDeque;
312    let initial_freelist: VecDeque<u32> = if compact {
313        VecDeque::new()
314    } else {
315        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
316    };
317    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
318
319    // 1. Stage each user table's B-Tree, collecting master-row info.
320    //    `kind` is "table" or "index" — master has one row per each.
321    let mut master_rows: Vec<CatalogEntry> = Vec::new();
322
323    let mut table_names: Vec<&String> = db.tables.keys().collect();
324    table_names.sort();
325    for name in table_names {
326        if name == MASTER_TABLE_NAME {
327            return Err(SQLRiteError::Internal(format!(
328                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
329            )));
330        }
331        if !compact {
332            if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
333                alloc.set_preferred(prev.clone());
334            }
335        }
336        let table = &db.tables[name];
337        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
338        alloc.finish_preferred();
339        master_rows.push(CatalogEntry {
340            kind: "table".into(),
341            name: name.clone(),
342            sql: table_to_create_sql(table),
343            rootpage,
344            last_rowid: table.last_rowid,
345        });
346    }
347
348    // 2. Stage each secondary index's B-Tree. Indexes persist in a
349    //    deterministic order: sorted by (owning_table, index_name).
350    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
351    for table in db.tables.values() {
352        for idx in &table.secondary_indexes {
353            index_entries.push((table, idx));
354        }
355    }
356    index_entries
357        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
358    for (_table, idx) in index_entries {
359        if !compact {
360            if let Some(prev) =
361                old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
362            {
363                alloc.set_preferred(prev.clone());
364            }
365        }
366        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
367        alloc.finish_preferred();
368        master_rows.push(CatalogEntry {
369            kind: "index".into(),
370            name: idx.name.clone(),
371            sql: idx.synthesized_sql(),
372            rootpage,
373            last_rowid: 0,
374        });
375    }
376
377    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
378    //     page trees, with the rootpage recorded in sqlrite_master.
379    //     Reopen loads the graph back from cells (fast, exact match)
380    //     instead of rebuilding from rows.
381    //
382    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
383    //     rebuilt from current rows BEFORE staging, so the on-disk
384    //     graph reflects the current row set.
385    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
386    for table in db.tables.values() {
387        for entry in &table.hnsw_indexes {
388            hnsw_entries.push((table, entry));
389        }
390    }
391    hnsw_entries
392        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
393    for (table, entry) in hnsw_entries {
394        if !compact {
395            if let Some(prev) =
396                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
397            {
398                alloc.set_preferred(prev.clone());
399            }
400        }
401        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
402        alloc.finish_preferred();
403        master_rows.push(CatalogEntry {
404            kind: "index".into(),
405            name: entry.name.clone(),
406            sql: format!(
407                "CREATE INDEX {} ON {} USING hnsw ({})",
408                entry.name, table.tb_name, entry.column_name
409            ),
410            rootpage,
411            last_rowid: 0,
412        });
413    }
414
415    // 2c. Phase 8c — persist FTS posting lists as their own
416    //     cell-encoded page trees, with the rootpage recorded in
417    //     sqlrite_master. Reopen loads the postings back from cells
418    //     (fast, exact match) instead of re-tokenizing rows.
419    //
420    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
421    //     rebuilt from current rows BEFORE staging by
422    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
423    //     the current row set.
424    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
425    for table in db.tables.values() {
426        for entry in &table.fts_indexes {
427            fts_entries.push((table, entry));
428        }
429    }
430    fts_entries
431        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
432    let any_fts = !fts_entries.is_empty();
433    for (table, entry) in fts_entries {
434        if !compact {
435            if let Some(prev) =
436                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
437            {
438                alloc.set_preferred(prev.clone());
439            }
440        }
441        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
442        alloc.finish_preferred();
443        master_rows.push(CatalogEntry {
444            kind: "index".into(),
445            name: entry.name.clone(),
446            sql: format!(
447                "CREATE INDEX {} ON {} USING fts ({})",
448                entry.name, table.tb_name, entry.column_name
449            ),
450            rootpage,
451            last_rowid: 0,
452        });
453    }
454
455    // 3. Build an in-memory sqlrite_master with one row per table or index,
456    //    then stage it via the same tree-build path. Seed master's
457    //    preferred pool with the previous master tree's pages so the
458    //    catalog page numbers stay stable across saves whenever the
459    //    catalog content didn't change.
460    let mut master = build_empty_master_table();
461    for (i, entry) in master_rows.into_iter().enumerate() {
462        let rowid = (i as i64) + 1;
463        master.restore_row(
464            rowid,
465            vec![
466                Some(Value::Text(entry.kind)),
467                Some(Value::Text(entry.name)),
468                Some(Value::Text(entry.sql)),
469                Some(Value::Integer(entry.rootpage as i64)),
470                Some(Value::Integer(entry.last_rowid)),
471            ],
472        )?;
473    }
474    if !compact && !old_master_pages.is_empty() {
475        // Use the page list snapshotted before any staging touched
476        // disk; re-walking here would read whatever a new index
477        // already restaged on top of master's old root (SQLR-1).
478        alloc.set_preferred(old_master_pages.clone());
479    }
480    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
481    alloc.finish_preferred();
482
483    // 4. Compute newly-freed pages: the previously-live set minus what
484    //    we just restaged. The previous freelist's trunk pages get
485    //    re-encoded too — they're in `old_live`, weren't restaged, so
486    //    the filter naturally moves them to the new freelist.
487    //
488    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
489    // routing it onto the new freelist. The whole point of VACUUM is
490    // to let the file truncate to the new high-water mark, so any page
491    // past it gets dropped at the next checkpoint.
492    if !compact {
493        let used = alloc.used().clone();
494        let mut newly_freed: Vec<u32> = old_live
495            .iter()
496            .copied()
497            .filter(|p| !used.contains(p))
498            .collect();
499        let _ = &old_free_trunks; // silenced — handled by the old_live filter
500        alloc.add_to_freelist(newly_freed.drain(..));
501    }
502
503    // 5. Encode the new freelist into trunk pages. `stage_freelist`
504    //    consumes some of the free pages AS the trunk pages themselves —
505    //    a trunk is just a free page borrowed for metadata. Pages that
506    //    were on the freelist but become trunks no longer need to be
507    //    "extension" pages; the high-water mark from the staging loop
508    //    above is already correct.
509    let new_free_pages = alloc.drain_freelist();
510    let new_freelist_head =
511        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
512
513    // 6. Pick the format version. v6 is on demand: only bumps when the
514    //    new freelist is non-empty. FTS-bearing files keep their v5
515    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
516    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
517    let format_version = if new_freelist_head != 0 {
518        FORMAT_VERSION_V6
519    } else if any_fts {
520        // Preserve a v6 file at v6 (don't downgrade) but otherwise
521        // bump v4 → v5 for FTS like Phase 8c does.
522        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
523    } else {
524        // Preserve whatever the file already was.
525        old_header.format_version
526    };
527
528    pager.commit(DbHeader {
529        page_count: alloc.high_water(),
530        schema_root_page: master_root,
531        format_version,
532        freelist_head: new_freelist_head,
533    })?;
534
535    if same_path {
536        db.pager = Some(pager);
537    }
538    Ok(())
539}
540
541/// Build material for a single row in sqlrite_master.
542struct CatalogEntry {
543    kind: String, // "table" or "index"
544    name: String,
545    sql: String,
546    rootpage: u32,
547    last_rowid: i64,
548}
549
550// -------------------------------------------------------------------------
551// sqlrite_master — hardcoded catalog table schema
552
553fn build_empty_master_table() -> Table {
554    // Phase 3e: `type` is the first column, matching SQLite's convention.
555    // It distinguishes `'table'` rows from `'index'` rows.
556    let columns = vec![
557        Column::new("type".into(), "text".into(), false, true, false),
558        Column::new("name".into(), "text".into(), true, true, true),
559        Column::new("sql".into(), "text".into(), false, true, false),
560        Column::new("rootpage".into(), "integer".into(), false, true, false),
561        Column::new("last_rowid".into(), "integer".into(), false, true, false),
562    ];
563    build_empty_table(MASTER_TABLE_NAME, columns, 0)
564}
565
566/// Reads a required Text column from a known-good catalog row.
567fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
568    match table.get_value(col, rowid) {
569        Some(Value::Text(s)) => Ok(s),
570        other => Err(SQLRiteError::Internal(format!(
571            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
572        ))),
573    }
574}
575
576/// Reads a required Integer column from a known-good catalog row.
577fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
578    match table.get_value(col, rowid) {
579        Some(Value::Integer(v)) => Ok(v),
580        other => Err(SQLRiteError::Internal(format!(
581            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
582        ))),
583    }
584}
585
586// -------------------------------------------------------------------------
587// CREATE-TABLE SQL synthesis and re-parsing
588
589/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
590/// Deterministic: same schema → same SQL, so diffing commits stay stable.
591fn table_to_create_sql(table: &Table) -> String {
592    let mut parts = Vec::with_capacity(table.columns.len());
593    for c in &table.columns {
594        // Render the SQL type literally so the round-trip through
595        // CREATE TABLE re-parsing recreates the same schema. Vector
596        // carries its dimension inline.
597        let ty: String = match &c.datatype {
598            DataType::Integer => "INTEGER".to_string(),
599            DataType::Text => "TEXT".to_string(),
600            DataType::Real => "REAL".to_string(),
601            DataType::Bool => "BOOLEAN".to_string(),
602            DataType::Vector(dim) => format!("VECTOR({dim})"),
603            DataType::Json => "JSON".to_string(),
604            DataType::None | DataType::Invalid => "TEXT".to_string(),
605        };
606        let mut piece = format!("{} {}", c.column_name, ty);
607        if c.is_pk {
608            piece.push_str(" PRIMARY KEY");
609        } else {
610            if c.is_unique {
611                piece.push_str(" UNIQUE");
612            }
613            if c.not_null {
614                piece.push_str(" NOT NULL");
615            }
616        }
617        if let Some(default) = &c.default {
618            piece.push_str(" DEFAULT ");
619            piece.push_str(&render_default_literal(default));
620        }
621        parts.push(piece);
622    }
623    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
624}
625
626/// Renders a DEFAULT value back to SQL-literal form so the synthesized
627/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
628/// single-quoted with single-quote doubling for escaping. Vector defaults
629/// are not currently expressible at CREATE TABLE time, so we render them
630/// as their bracket-array form (matches the INSERT literal grammar).
631fn render_default_literal(value: &Value) -> String {
632    match value {
633        Value::Integer(i) => i.to_string(),
634        Value::Real(f) => f.to_string(),
635        Value::Bool(b) => {
636            if *b {
637                "TRUE".to_string()
638            } else {
639                "FALSE".to_string()
640            }
641        }
642        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
643        Value::Null => "NULL".to_string(),
644        Value::Vector(_) => value.to_display_string(),
645    }
646}
647
648/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
649/// and produces our internal column list. Returns `(table_name, columns)`.
650fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
651    let dialect = SQLiteDialect {};
652    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
653    let stmt = ast.pop().ok_or_else(|| {
654        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
655    })?;
656    let create = CreateQuery::new(&stmt)?;
657    let columns = create
658        .columns
659        .into_iter()
660        .map(|pc| {
661            Column::with_default(
662                pc.name,
663                pc.datatype,
664                pc.is_pk,
665                pc.not_null,
666                pc.is_unique,
667                pc.default,
668            )
669        })
670        .collect();
671    Ok((create.table_name, columns))
672}
673
674// -------------------------------------------------------------------------
675// In-memory table (re)construction
676
677/// Builds an empty in-memory `Table` given the declared columns.
678fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
679    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
680    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
681    {
682        let mut map = rows.lock().expect("rows mutex poisoned");
683        for col in &columns {
684            // Mirror the dispatch in `Table::new` so the reconstructed
685            // table has the same shape it'd have if it were built fresh
686            // from SQL. Phase 7a adds the Vector arm — without it,
687            // VECTOR columns silently restore as Row::None and every
688            // restore_row hits a "storage None vs value Some(Vector(...))"
689            // type mismatch.
690            let row = match &col.datatype {
691                DataType::Integer => Row::Integer(BTreeMap::new()),
692                DataType::Text => Row::Text(BTreeMap::new()),
693                DataType::Real => Row::Real(BTreeMap::new()),
694                DataType::Bool => Row::Bool(BTreeMap::new()),
695                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
696                // JSON columns reuse Text storage — see Table::new and
697                // Phase 7e's scope-correction note.
698                DataType::Json => Row::Text(BTreeMap::new()),
699                DataType::None | DataType::Invalid => Row::None,
700            };
701            map.insert(col.column_name.clone(), row);
702
703            // Auto-create UNIQUE/PK indexes so the restored table has the
704            // same shape Table::new would have built from fresh SQL.
705            if (col.is_pk || col.is_unique)
706                && matches!(col.datatype, DataType::Integer | DataType::Text)
707            {
708                if let Ok(idx) = SecondaryIndex::new(
709                    SecondaryIndex::auto_name(name, &col.column_name),
710                    name.to_string(),
711                    col.column_name.clone(),
712                    &col.datatype,
713                    true,
714                    IndexOrigin::Auto,
715                ) {
716                    secondary_indexes.push(idx);
717                }
718            }
719        }
720    }
721
722    let primary_key = columns
723        .iter()
724        .find(|c| c.is_pk)
725        .map(|c| c.column_name.clone())
726        .unwrap_or_else(|| "-1".to_string());
727
728    Table {
729        tb_name: name.to_string(),
730        columns,
731        rows,
732        secondary_indexes,
733        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
734        // executing each `CREATE INDEX … USING hnsw` SQL stored in
735        // `sqlrite_master`. This builder produces the empty shell;
736        // `replay_create_index_for_hnsw` (in this same module) walks
737        // sqlrite_master after every table is loaded and rebuilds the
738        // graph from current row data. Persistence of the graph itself
739        // (avoiding the on-open rebuild cost) is Phase 7d.3.
740        hnsw_indexes: Vec::new(),
741        // FTS indexes (Phase 8b) follow the same pattern — the
742        // CREATE INDEX … USING fts SQL is the source of truth on open
743        // and the in-memory posting list gets rebuilt from current
744        // rows. Cell-encoded persistence of the postings is Phase 8c.
745        fts_indexes: Vec::new(),
746        last_rowid,
747        primary_key,
748    }
749}
750
751// -------------------------------------------------------------------------
752// Leaf-chain read / write
753
754/// Walks a table's B-Tree from `root_page`, following the leftmost-child
755/// chain down to the first leaf, then iterating leaves via their sibling
756/// `next_page` pointers. Every cell is decoded and replayed into `table`.
757///
758/// Open-path note: we eagerly materialize the entire table into `Table`'s
759/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
760/// on demand so queries can stream through the tree without a full upfront
761/// load.
762/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
763/// index on its base table by walking the tree of index cells at
764/// `rootpage`. The base table is expected to already be in `db.tables`.
765fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
766    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
767
768    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
769        SQLRiteError::Internal(format!(
770            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
771            row.name
772        ))
773    })?;
774    let datatype = table
775        .columns
776        .iter()
777        .find(|c| c.column_name == column_name)
778        .map(|c| clone_datatype(&c.datatype))
779        .ok_or_else(|| {
780            SQLRiteError::Internal(format!(
781                "index '{}' references unknown column '{column_name}' on '{table_name}'",
782                row.name
783            ))
784        })?;
785
786    // An auto-index on this column may already exist (built by
787    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
788    // the slot instead of adding a duplicate entry.
789    let existing_slot = table
790        .secondary_indexes
791        .iter()
792        .position(|i| i.name == row.name);
793    let idx = match existing_slot {
794        Some(i) => {
795            // Drain any entries that may have been populated during table
796            // restore_row calls — we're about to repopulate from the
797            // persisted tree.
798            table.secondary_indexes.remove(i)
799        }
800        None => SecondaryIndex::new(
801            row.name.clone(),
802            table_name.clone(),
803            column_name.clone(),
804            &datatype,
805            is_unique,
806            IndexOrigin::Explicit,
807        )?,
808    };
809    let mut idx = idx;
810    // Wipe any stale entries from the auto path so the load is idempotent.
811    let is_unique_flag = idx.is_unique;
812    let origin = idx.origin;
813    idx = SecondaryIndex::new(
814        idx.name,
815        idx.table_name,
816        idx.column_name,
817        &datatype,
818        is_unique_flag,
819        origin,
820    )?;
821
822    // Populate from the index tree's cells.
823    load_index_rows(pager, &mut idx, row.rootpage)?;
824
825    table.secondary_indexes.push(idx);
826    Ok(())
827}
828
829/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
830/// every `(value, rowid)` pair into `idx`.
831fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
832    if root_page == 0 {
833        return Ok(());
834    }
835    let first_leaf = find_leftmost_leaf(pager, root_page)?;
836    let mut current = first_leaf;
837    while current != 0 {
838        let page_buf = pager
839            .read_page(current)
840            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
841        if page_buf[0] != PageType::TableLeaf as u8 {
842            return Err(SQLRiteError::Internal(format!(
843                "page {current} tagged {} but expected TableLeaf (index)",
844                page_buf[0]
845            )));
846        }
847        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
848        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
849            .try_into()
850            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
851        let leaf = TablePage::from_bytes(payload);
852
853        for slot in 0..leaf.slot_count() {
854            // Slots on an index page hold KIND_INDEX cells; decode directly.
855            let offset = leaf.slot_offset_raw(slot)?;
856            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
857            idx.insert(&ic.value, ic.rowid)?;
858        }
859        current = next_leaf;
860    }
861    Ok(())
862}
863
864/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
865/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
866///
867/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
868/// still works; the only shape we accept is single-column indexes.
869fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
870    use sqlparser::ast::{CreateIndex, Expr, Statement};
871
872    let dialect = SQLiteDialect {};
873    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
874    let Some(Statement::CreateIndex(CreateIndex {
875        table_name,
876        columns,
877        unique,
878        ..
879    })) = ast.pop()
880    else {
881        return Err(SQLRiteError::Internal(format!(
882            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
883        )));
884    };
885    if columns.len() != 1 {
886        return Err(SQLRiteError::NotImplemented(
887            "multi-column indexes aren't supported yet".to_string(),
888        ));
889    }
890    let col = match &columns[0].column.expr {
891        Expr::Identifier(ident) => ident.value.clone(),
892        Expr::CompoundIdentifier(parts) => {
893            parts.last().map(|p| p.value.clone()).unwrap_or_default()
894        }
895        other => {
896            return Err(SQLRiteError::Internal(format!(
897                "unsupported indexed column expression: {other:?}"
898            )));
899        }
900    };
901    Ok((table_name.to_string(), col, unique))
902}
903
904/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
905/// Used by the open path to route HNSW indexes to the graph-rebuild path
906/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
907/// don't have a USING clause, so they all return false and continue
908/// taking the existing path.
909fn create_index_sql_uses_hnsw(sql: &str) -> bool {
910    use sqlparser::ast::{CreateIndex, IndexType, Statement};
911
912    let dialect = SQLiteDialect {};
913    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
914        return false;
915    };
916    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
917        return false;
918    };
919    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
920}
921
922/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
923/// Mirrors [`create_index_sql_uses_hnsw`].
924fn create_index_sql_uses_fts(sql: &str) -> bool {
925    use sqlparser::ast::{CreateIndex, IndexType, Statement};
926
927    let dialect = SQLiteDialect {};
928    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
929        return false;
930    };
931    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
932        return false;
933    };
934    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
935}
936
937/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
938/// paths mirror [`rebuild_hnsw_index`]:
939///
940///   - **rootpage != 0** (Phase 8c default): the posting list is
941///     persisted as cell-encoded pages. Read every cell directly via
942///     [`load_fts_postings`] and reconstruct the index — no
943///     re-tokenization, exact bit-for-bit reproduction.
944///
945///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
946///     for files saved by Phase 8b before persistence landed. Replay
947///     the CREATE INDEX SQL through `execute_create_index`, which
948///     walks the table's current rows and tokenizes them fresh.
949fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
950    use crate::sql::db::table::FtsIndexEntry;
951    use crate::sql::executor::execute_create_index;
952    use crate::sql::fts::PostingList;
953    use sqlparser::ast::Statement;
954
955    let dialect = SQLiteDialect {};
956    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
957    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
958        return Err(SQLRiteError::Internal(format!(
959            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
960            row.sql
961        )));
962    };
963
964    if row.rootpage == 0 {
965        // Compatibility path — no persisted postings; replay rows.
966        execute_create_index(&stmt, db)?;
967        return Ok(());
968    }
969
970    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
971    let index = PostingList::from_persisted_postings(doc_lengths, postings);
972    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
973    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
974        SQLRiteError::Internal(format!(
975            "FTS index '{}' references unknown table '{tbl_name}'",
976            row.name
977        ))
978    })?;
979    table_mut.fts_indexes.push(FtsIndexEntry {
980        name: row.name.clone(),
981        column_name: col_name,
982        index,
983        needs_rebuild: false,
984    });
985    Ok(())
986}
987
988/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
989/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
990fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
991    use sqlparser::ast::{CreateIndex, Expr, Statement};
992
993    let dialect = SQLiteDialect {};
994    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
995    let Some(Statement::CreateIndex(CreateIndex {
996        table_name,
997        columns,
998        ..
999    })) = ast.pop()
1000    else {
1001        return Err(SQLRiteError::Internal(format!(
1002            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
1003        )));
1004    };
1005    if columns.len() != 1 {
1006        return Err(SQLRiteError::NotImplemented(
1007            "multi-column FTS indexes aren't supported yet".to_string(),
1008        ));
1009    }
1010    let col = match &columns[0].column.expr {
1011        Expr::Identifier(ident) => ident.value.clone(),
1012        Expr::CompoundIdentifier(parts) => {
1013            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1014        }
1015        other => {
1016            return Err(SQLRiteError::Internal(format!(
1017                "FTS CREATE INDEX has unexpected column expr: {other:?}"
1018            )));
1019        }
1020    };
1021    Ok((table_name.to_string(), col))
1022}
1023
1024/// Loads (or rebuilds) an HNSW index on database open. Two paths:
1025///
1026///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1027///     as cell-encoded pages. Read every node directly via
1028///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1029///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1030///
1031///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1032///     files saved by Phase 7d.2 before persistence landed. Replay the
1033///     CREATE INDEX SQL through `execute_create_index`, which walks the
1034///     table's current rows and populates a fresh graph. Slower but
1035///     correctness-equivalent on the first save with the new code.
1036fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1037    use crate::sql::db::table::HnswIndexEntry;
1038    use crate::sql::executor::execute_create_index;
1039    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1040    use sqlparser::ast::Statement;
1041
1042    let dialect = SQLiteDialect {};
1043    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1044    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
1045        return Err(SQLRiteError::Internal(format!(
1046            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1047            row.sql
1048        )));
1049    };
1050
1051    if row.rootpage == 0 {
1052        // Compatibility path — no persisted graph; walk current rows.
1053        execute_create_index(&stmt, db)?;
1054        return Ok(());
1055    }
1056
1057    // Persistence path — read the cell tree, deserialize.
1058    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
1059    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
1060
1061    // Parse the CREATE INDEX to know which table + column to attach to
1062    // — same shape as the row-walk path; we just don't execute it.
1063    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
1064    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
1065        SQLRiteError::Internal(format!(
1066            "HNSW index '{}' references unknown table '{tbl_name}'",
1067            row.name
1068        ))
1069    })?;
1070    table_mut.hnsw_indexes.push(HnswIndexEntry {
1071        name: row.name.clone(),
1072        column_name: col_name,
1073        index,
1074        needs_rebuild: false,
1075    });
1076    Ok(())
1077}
1078
1079/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1080/// page tree at `root_page` and decode each cell as a node. Returns
1081/// the (node_id, layers) tuples in slot-order (already ascending by
1082/// node_id since they were staged that way). The caller hands them to
1083/// `HnswIndex::from_persisted_nodes`.
1084fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1085    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1086
1087    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1088    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1089    let mut current = first_leaf;
1090    while current != 0 {
1091        let page_buf = pager
1092            .read_page(current)
1093            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1094        if page_buf[0] != PageType::TableLeaf as u8 {
1095            return Err(SQLRiteError::Internal(format!(
1096                "page {current} tagged {} but expected TableLeaf (HNSW)",
1097                page_buf[0]
1098            )));
1099        }
1100        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1101        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1102            .try_into()
1103            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1104        let leaf = TablePage::from_bytes(payload);
1105        for slot in 0..leaf.slot_count() {
1106            let offset = leaf.slot_offset_raw(slot)?;
1107            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1108            nodes.push((cell.node_id, cell.layers));
1109        }
1110        current = next_leaf;
1111    }
1112    Ok(nodes)
1113}
1114
1115/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
1116/// SQL string. Used by the persistence path on open to know where to
1117/// attach the loaded graph. Same shape as `parse_create_index_sql` for
1118/// regular indexes — only the assertion differs (we don't care about
1119/// UNIQUE for HNSW).
1120fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
1121    use sqlparser::ast::{CreateIndex, Expr, Statement};
1122
1123    let dialect = SQLiteDialect {};
1124    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1125    let Some(Statement::CreateIndex(CreateIndex {
1126        table_name,
1127        columns,
1128        ..
1129    })) = ast.pop()
1130    else {
1131        return Err(SQLRiteError::Internal(format!(
1132            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1133        )));
1134    };
1135    if columns.len() != 1 {
1136        return Err(SQLRiteError::NotImplemented(
1137            "multi-column HNSW indexes aren't supported yet".to_string(),
1138        ));
1139    }
1140    let col = match &columns[0].column.expr {
1141        Expr::Identifier(ident) => ident.value.clone(),
1142        Expr::CompoundIdentifier(parts) => {
1143            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1144        }
1145        other => {
1146            return Err(SQLRiteError::Internal(format!(
1147                "unsupported HNSW indexed column expression: {other:?}"
1148            )));
1149        }
1150    };
1151    Ok((table_name.to_string(), col))
1152}
1153
1154/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1155/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1156/// Walks the table's current Vec<f32> column storage and runs the
1157/// HNSW algorithm fresh. Called at the top of `save_database` before
1158/// any immutable borrows of `db` start.
1159///
1160/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1161/// small tables, expensive for ≥100k-row tables — matches the
1162/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1163/// MVP, more sophisticated incremental delete strategies (soft-delete
1164/// + tombstones, neighbor reconnection) are future polish.
1165fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
1166    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1167
1168    for table in db.tables.values_mut() {
1169        // Snapshot which (index_name, column) pairs need rebuilding,
1170        // before we go grabbing column data — keeps the borrow
1171        // structure simple.
1172        let dirty: Vec<(String, String)> = table
1173            .hnsw_indexes
1174            .iter()
1175            .filter(|e| e.needs_rebuild)
1176            .map(|e| (e.name.clone(), e.column_name.clone()))
1177            .collect();
1178        if dirty.is_empty() {
1179            continue;
1180        }
1181
1182        for (idx_name, col_name) in dirty {
1183            // Snapshot every (rowid, vec) for this column.
1184            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
1185            {
1186                let row_data = table.rows.lock().expect("rows mutex poisoned");
1187                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
1188                    for (id, v) in map.iter() {
1189                        vectors.push((*id, v.clone()));
1190                    }
1191                }
1192            }
1193            // Pre-build a HashMap for the get_vec closure so we don't
1194            // pay O(N) lookup per insert call.
1195            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
1196                vectors.iter().cloned().collect();
1197
1198            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
1199            // Sort by id so the rebuild is deterministic across runs.
1200            vectors.sort_by_key(|(id, _)| *id);
1201            for (id, v) in &vectors {
1202                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
1203            }
1204
1205            // Replace the entry's index + clear the dirty flag.
1206            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
1207                entry.index = new_idx;
1208                entry.needs_rebuild = false;
1209            }
1210        }
1211    }
1212}
1213
1214/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1215/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1216/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1217/// is empty so the per-table loop short-circuits).
1218fn rebuild_dirty_fts_indexes(db: &mut Database) {
1219    use crate::sql::fts::PostingList;
1220
1221    for table in db.tables.values_mut() {
1222        let dirty: Vec<(String, String)> = table
1223            .fts_indexes
1224            .iter()
1225            .filter(|e| e.needs_rebuild)
1226            .map(|e| (e.name.clone(), e.column_name.clone()))
1227            .collect();
1228        if dirty.is_empty() {
1229            continue;
1230        }
1231
1232        for (idx_name, col_name) in dirty {
1233            // Snapshot every (rowid, text) pair for this column under
1234            // the row mutex, then drop the lock before re-tokenizing.
1235            let mut docs: Vec<(i64, String)> = Vec::new();
1236            {
1237                let row_data = table.rows.lock().expect("rows mutex poisoned");
1238                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1239                    for (id, v) in map.iter() {
1240                        // "Null" sentinel is the parser's
1241                        // null-marker for TEXT cells; skip those —
1242                        // they'd round-trip as the literal string
1243                        // "Null" otherwise. Aligns with insert_row's
1244                        // typed_value gate.
1245                        if v != "Null" {
1246                            docs.push((*id, v.clone()));
1247                        }
1248                    }
1249                }
1250            }
1251
1252            let mut new_idx = PostingList::new();
1253            // Sort by id so the rebuild is deterministic across runs
1254            // (the BTreeMap inside PostingList is order-stable, but
1255            // doc-length aggregation order doesn't matter — sorting
1256            // here is purely for reproducibility on inspection).
1257            docs.sort_by_key(|(id, _)| *id);
1258            for (id, text) in &docs {
1259                new_idx.insert(*id, text);
1260            }
1261
1262            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1263                entry.index = new_idx;
1264                entry.needs_rebuild = false;
1265            }
1266        }
1267    }
1268}
1269
1270/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1271fn clone_datatype(dt: &DataType) -> DataType {
1272    match dt {
1273        DataType::Integer => DataType::Integer,
1274        DataType::Text => DataType::Text,
1275        DataType::Real => DataType::Real,
1276        DataType::Bool => DataType::Bool,
1277        DataType::Vector(dim) => DataType::Vector(*dim),
1278        DataType::Json => DataType::Json,
1279        DataType::None => DataType::None,
1280        DataType::Invalid => DataType::Invalid,
1281    }
1282}
1283
1284/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1285/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1286/// `(root_page, next_free_page)`.
1287///
1288/// The tree's shape matches a regular table's — leaves chained via
1289/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1290/// uniformly for index cells (same prefix as local cells), so the
1291/// existing slot directory and binary search carry over.
1292fn stage_index_btree(
1293    pager: &mut Pager,
1294    idx: &SecondaryIndex,
1295    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1296) -> Result<u32> {
1297    // Build the leaves.
1298    let leaves = stage_index_leaves(pager, idx, alloc)?;
1299    if leaves.len() == 1 {
1300        return Ok(leaves[0].0);
1301    }
1302    let mut level: Vec<(u32, i64)> = leaves;
1303    while level.len() > 1 {
1304        level = stage_interior_level(pager, &level, alloc)?;
1305    }
1306    Ok(level[0].0)
1307}
1308
1309/// Packs the index's (value, rowid) entries into a sibling-chained run
1310/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1311/// (ascending value; rowids in insertion order within a value), which is
1312/// also ascending by the "cell rowid" carried in each IndexCell (the
1313/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1314/// rowid ordering stays consistent.
1315fn stage_index_leaves(
1316    pager: &mut Pager,
1317    idx: &SecondaryIndex,
1318    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1319) -> Result<Vec<(u32, i64)>> {
1320    let mut leaves: Vec<(u32, i64)> = Vec::new();
1321    let mut current_leaf = TablePage::empty();
1322    let mut current_leaf_page = alloc.allocate();
1323    let mut current_max_rowid: Option<i64> = None;
1324
1325    // Sort the entries by original rowid so the in-page slot directory,
1326    // which binary-searches by rowid, stays valid. (iter_entries orders by
1327    // value; we reorder here for B-Tree correctness.)
1328    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1329    entries.sort_by_key(|(_, r)| *r);
1330
1331    for (value, rowid) in entries {
1332        let cell = IndexCell::new(rowid, value);
1333        let entry_bytes = cell.encode()?;
1334
1335        if !current_leaf.would_fit(entry_bytes.len()) {
1336            let next_leaf_page_num = alloc.allocate();
1337            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1338            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1339            current_leaf = TablePage::empty();
1340            current_leaf_page = next_leaf_page_num;
1341
1342            if !current_leaf.would_fit(entry_bytes.len()) {
1343                return Err(SQLRiteError::Internal(format!(
1344                    "index entry of {} bytes exceeds empty-page capacity {}",
1345                    entry_bytes.len(),
1346                    current_leaf.free_space()
1347                )));
1348            }
1349        }
1350        current_leaf.insert_entry(rowid, &entry_bytes)?;
1351        current_max_rowid = Some(rowid);
1352    }
1353
1354    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1355    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1356    Ok(leaves)
1357}
1358
1359/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1360/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1361/// (node_id, layers). Returns `(root_page, next_free_page)`.
1362///
1363/// Tree shape is identical to `stage_index_btree` — chained leaves +
1364/// optional interior layers. The slot directory binary-searches by
1365/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1366/// so reads can locate any node in O(log N) once 7d.4-or-later
1367/// optimizes the load path to lazy-fetch instead of read-all.
1368/// Today, `load_hnsw_nodes` reads the entire tree on open.
1369fn stage_hnsw_btree(
1370    pager: &mut Pager,
1371    idx: &crate::sql::hnsw::HnswIndex,
1372    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1373) -> Result<u32> {
1374    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1375    if leaves.len() == 1 {
1376        return Ok(leaves[0].0);
1377    }
1378    let mut level: Vec<(u32, i64)> = leaves;
1379    while level.len() > 1 {
1380        level = stage_interior_level(pager, &level, alloc)?;
1381    }
1382    Ok(level[0].0)
1383}
1384
1385/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1386/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1387/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1388/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1389/// doc-lengths map, then one cell per term in lexicographic order.
1390fn stage_fts_btree(
1391    pager: &mut Pager,
1392    idx: &crate::sql::fts::PostingList,
1393    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1394) -> Result<u32> {
1395    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1396    if leaves.len() == 1 {
1397        return Ok(leaves[0].0);
1398    }
1399    let mut level: Vec<(u32, i64)> = leaves;
1400    while level.len() > 1 {
1401        level = stage_interior_level(pager, &level, alloc)?;
1402    }
1403    Ok(level[0].0)
1404}
1405
1406/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1407/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1408/// followed by one cell per term in lexicographic order with
1409/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1410/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1411/// returns).
1412fn stage_fts_leaves(
1413    pager: &mut Pager,
1414    idx: &crate::sql::fts::PostingList,
1415    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1416) -> Result<Vec<(u32, i64)>> {
1417    use crate::sql::pager::fts_cell::FtsPostingCell;
1418
1419    let mut leaves: Vec<(u32, i64)> = Vec::new();
1420    let mut current_leaf = TablePage::empty();
1421    let mut current_leaf_page = alloc.allocate();
1422    let mut current_max_rowid: Option<i64> = None;
1423
1424    // Build the cell sequence: sidecar first, then per-term cells. The
1425    // sidecar always exists (even on an empty index) so reload sees a
1426    // canonical "this index was persisted" marker in slot 0.
1427    let mut cell_id: i64 = 1;
1428    let mut cells: Vec<FtsPostingCell> = Vec::new();
1429    cells.push(FtsPostingCell::doc_lengths(
1430        cell_id,
1431        idx.serialize_doc_lengths(),
1432    ));
1433    for (term, entries) in idx.serialize_postings() {
1434        cell_id += 1;
1435        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1436    }
1437
1438    for cell in cells {
1439        let entry_bytes = cell.encode()?;
1440
1441        if !current_leaf.would_fit(entry_bytes.len()) {
1442            let next_leaf_page_num = alloc.allocate();
1443            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1444            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1445            current_leaf = TablePage::empty();
1446            current_leaf_page = next_leaf_page_num;
1447
1448            if !current_leaf.would_fit(entry_bytes.len()) {
1449                // A single posting cell exceeds page capacity. Phase
1450                // 8c MVP doesn't chain via overflow cells (the plan
1451                // notes this as a stretch goal); surface a clear
1452                // error so users know which term tripped it.
1453                return Err(SQLRiteError::Internal(format!(
1454                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1455                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1456                    cell.cell_id,
1457                    entry_bytes.len(),
1458                    current_leaf.free_space()
1459                )));
1460            }
1461        }
1462        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1463        current_max_rowid = Some(cell.cell_id);
1464    }
1465
1466    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1467    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1468    Ok(leaves)
1469}
1470
1471/// (rowid, value) pairs as decoded from a single FTS cell — value is
1472/// either term frequency (posting cell) or doc length (sidecar cell).
1473type FtsEntries = Vec<(i64, u32)>;
1474/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1475type FtsPostings = Vec<(String, FtsEntries)>;
1476
1477/// Phase 8c — read every cell of an FTS index from `root_page` back
1478/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1479/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1480/// sibling chain, decode each slot.
1481fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1482    use crate::sql::pager::fts_cell::FtsPostingCell;
1483
1484    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1485    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1486    let mut saw_sidecar = false;
1487
1488    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1489    let mut current = first_leaf;
1490    while current != 0 {
1491        let page_buf = pager
1492            .read_page(current)
1493            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1494        if page_buf[0] != PageType::TableLeaf as u8 {
1495            return Err(SQLRiteError::Internal(format!(
1496                "page {current} tagged {} but expected TableLeaf (FTS)",
1497                page_buf[0]
1498            )));
1499        }
1500        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1501        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1502            .try_into()
1503            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1504        let leaf = TablePage::from_bytes(payload);
1505        for slot in 0..leaf.slot_count() {
1506            let offset = leaf.slot_offset_raw(slot)?;
1507            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1508            if cell.is_doc_lengths() {
1509                if saw_sidecar {
1510                    return Err(SQLRiteError::Internal(
1511                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1512                    ));
1513                }
1514                saw_sidecar = true;
1515                doc_lengths = cell.entries;
1516            } else {
1517                postings.push((cell.term, cell.entries));
1518            }
1519        }
1520        current = next_leaf;
1521    }
1522
1523    if !saw_sidecar {
1524        return Err(SQLRiteError::Internal(
1525            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1526        ));
1527    }
1528    Ok((doc_lengths, postings))
1529}
1530
1531/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1532/// `serialize_nodes` already returns nodes in ascending node_id order,
1533/// so the slot directory's rowid ordering stays valid.
1534fn stage_hnsw_leaves(
1535    pager: &mut Pager,
1536    idx: &crate::sql::hnsw::HnswIndex,
1537    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1538) -> Result<Vec<(u32, i64)>> {
1539    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1540
1541    let mut leaves: Vec<(u32, i64)> = Vec::new();
1542    let mut current_leaf = TablePage::empty();
1543    let mut current_leaf_page = alloc.allocate();
1544    let mut current_max_rowid: Option<i64> = None;
1545
1546    let serialized = idx.serialize_nodes();
1547
1548    // Empty index → emit a single empty leaf page so the rootpage
1549    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1550    // it just happens to be empty"). load_hnsw_nodes is fine with an
1551    // empty leaf — slot_count() returns 0.
1552    for (node_id, layers) in serialized {
1553        let cell = HnswNodeCell::new(node_id, layers);
1554        let entry_bytes = cell.encode()?;
1555
1556        if !current_leaf.would_fit(entry_bytes.len()) {
1557            let next_leaf_page_num = alloc.allocate();
1558            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1559            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1560            current_leaf = TablePage::empty();
1561            current_leaf_page = next_leaf_page_num;
1562
1563            if !current_leaf.would_fit(entry_bytes.len()) {
1564                return Err(SQLRiteError::Internal(format!(
1565                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1566                    entry_bytes.len(),
1567                    current_leaf.free_space()
1568                )));
1569            }
1570        }
1571        current_leaf.insert_entry(node_id, &entry_bytes)?;
1572        current_max_rowid = Some(node_id);
1573    }
1574
1575    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1576    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1577    Ok(leaves)
1578}
1579
1580fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1581    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1582    let mut current = first_leaf;
1583    while current != 0 {
1584        let page_buf = pager
1585            .read_page(current)
1586            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1587        if page_buf[0] != PageType::TableLeaf as u8 {
1588            return Err(SQLRiteError::Internal(format!(
1589                "page {current} tagged {} but expected TableLeaf",
1590                page_buf[0]
1591            )));
1592        }
1593        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1594        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1595            .try_into()
1596            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1597        let leaf = TablePage::from_bytes(payload);
1598
1599        for slot in 0..leaf.slot_count() {
1600            let entry = leaf.entry_at(slot)?;
1601            let cell = match entry {
1602                PagedEntry::Local(c) => c,
1603                PagedEntry::Overflow(r) => {
1604                    let body_bytes =
1605                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1606                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1607                    c
1608                }
1609            };
1610            table.restore_row(cell.rowid, cell.values)?;
1611        }
1612        current = next_leaf;
1613    }
1614    Ok(())
1615}
1616
1617/// Walks every page reachable from `root_page` and returns their page
1618/// numbers. Includes `root_page`, every interior page, every leaf, and
1619/// — when `follow_overflow` is true — every overflow page chained off
1620/// table-leaf cells. Used by `save_database` to seed each table's
1621/// per-table preferred pool and to compute the newly-freed set.
1622///
1623/// `follow_overflow = true` for table B-Trees (cells may carry
1624/// `OverflowRef`s pointing at chained overflow pages); `false` for
1625/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1626/// current encoding.
1627fn collect_pages_for_btree(
1628    pager: &Pager,
1629    root_page: u32,
1630    follow_overflow: bool,
1631) -> Result<Vec<u32>> {
1632    if root_page == 0 {
1633        return Ok(Vec::new());
1634    }
1635    let mut pages: Vec<u32> = Vec::new();
1636    let mut stack: Vec<u32> = vec![root_page];
1637
1638    while let Some(p) = stack.pop() {
1639        let buf = pager.read_page(p).ok_or_else(|| {
1640            SQLRiteError::Internal(format!(
1641                "collect_pages: missing page {p} (rooted at {root_page})"
1642            ))
1643        })?;
1644        pages.push(p);
1645        match buf[0] {
1646            t if t == PageType::InteriorNode as u8 => {
1647                let payload: &[u8; PAYLOAD_PER_PAGE] =
1648                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1649                        SQLRiteError::Internal("interior payload slice size".to_string())
1650                    })?;
1651                let interior = InteriorPage::from_bytes(payload);
1652                // Push every divider's child + the rightmost child.
1653                for slot in 0..interior.slot_count() {
1654                    let cell = interior.cell_at(slot)?;
1655                    stack.push(cell.child_page);
1656                }
1657                stack.push(interior.rightmost_child());
1658            }
1659            t if t == PageType::TableLeaf as u8 => {
1660                if follow_overflow {
1661                    let payload: &[u8; PAYLOAD_PER_PAGE] =
1662                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1663                            SQLRiteError::Internal("leaf payload slice size".to_string())
1664                        })?;
1665                    let leaf = TablePage::from_bytes(payload);
1666                    for slot in 0..leaf.slot_count() {
1667                        match leaf.entry_at(slot)? {
1668                            PagedEntry::Local(_) => {}
1669                            PagedEntry::Overflow(r) => {
1670                                let mut cur = r.first_overflow_page;
1671                                while cur != 0 {
1672                                    pages.push(cur);
1673                                    let ob = pager.read_page(cur).ok_or_else(|| {
1674                                        SQLRiteError::Internal(format!(
1675                                            "collect_pages: missing overflow page {cur}"
1676                                        ))
1677                                    })?;
1678                                    if ob[0] != PageType::Overflow as u8 {
1679                                        return Err(SQLRiteError::Internal(format!(
1680                                            "collect_pages: page {cur} expected Overflow, got tag {}",
1681                                            ob[0]
1682                                        )));
1683                                    }
1684                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
1685                                }
1686                            }
1687                        }
1688                    }
1689                }
1690            }
1691            other => {
1692                return Err(SQLRiteError::Internal(format!(
1693                    "collect_pages: unexpected page type {other} at page {p}"
1694                )));
1695            }
1696        }
1697    }
1698    Ok(pages)
1699}
1700
1701/// Reads the previously-persisted `sqlrite_master` and returns a map from
1702/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1703/// seed each table/index's per-table preferred pool with the pages it
1704/// occupied last time round.
1705///
1706/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1707/// the three index families via the SQL string, but for page-collection
1708/// purposes a "table" tree must follow overflow refs while an "index"
1709/// tree never does — that's the only distinction we need here).
1710fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
1711    let mut out: HashMap<(String, String), u32> = HashMap::new();
1712    if schema_root == 0 {
1713        return Ok(out);
1714    }
1715    let mut master = build_empty_master_table();
1716    load_table_rows(pager, &mut master, schema_root)?;
1717    for rowid in master.rowids() {
1718        let kind = take_text(&master, "type", rowid)?;
1719        let name = take_text(&master, "name", rowid)?;
1720        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
1721        out.insert((kind, name), rootpage);
1722    }
1723    Ok(out)
1724}
1725
1726/// Descends from `root_page` through `InteriorNode` pages, always taking
1727/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1728/// page number. A root that's already a leaf is returned as-is.
1729fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1730    let mut current = root_page;
1731    loop {
1732        let page_buf = pager.read_page(current).ok_or_else(|| {
1733            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1734        })?;
1735        match page_buf[0] {
1736            t if t == PageType::TableLeaf as u8 => return Ok(current),
1737            t if t == PageType::InteriorNode as u8 => {
1738                let payload: &[u8; PAYLOAD_PER_PAGE] =
1739                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1740                        SQLRiteError::Internal("interior payload slice size".to_string())
1741                    })?;
1742                let interior = InteriorPage::from_bytes(payload);
1743                current = interior.leftmost_child()?;
1744            }
1745            other => {
1746                return Err(SQLRiteError::Internal(format!(
1747                    "unexpected page type {other} during tree descent at page {current}"
1748                )));
1749            }
1750        }
1751    }
1752}
1753
1754/// Stages a table's B-Tree, drawing every page number from `alloc`.
1755/// Returns the root page (the topmost interior page, or the single leaf
1756/// when the table fits in one page).
1757///
1758/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1759/// `next_page`, then if more than one leaf, recursively wrap them in
1760/// `InteriorNode` levels until one root remains.
1761///
1762/// Deterministic: same rows + same allocator handouts → byte-identical
1763/// pages at the same numbers, so the diff pager skips unchanged tables.
1764fn stage_table_btree(
1765    pager: &mut Pager,
1766    table: &Table,
1767    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1768) -> Result<u32> {
1769    let leaves = stage_leaves(pager, table, alloc)?;
1770    if leaves.len() == 1 {
1771        return Ok(leaves[0].0);
1772    }
1773    let mut level: Vec<(u32, i64)> = leaves;
1774    while level.len() > 1 {
1775        level = stage_interior_level(pager, &level, alloc)?;
1776    }
1777    Ok(level[0].0)
1778}
1779
1780/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1781/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1782/// interior level. Allocates leaf and overflow pages from `alloc`.
1783fn stage_leaves(
1784    pager: &mut Pager,
1785    table: &Table,
1786    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1787) -> Result<Vec<(u32, i64)>> {
1788    let mut leaves: Vec<(u32, i64)> = Vec::new();
1789    let mut current_leaf = TablePage::empty();
1790    let mut current_leaf_page = alloc.allocate();
1791    let mut current_max_rowid: Option<i64> = None;
1792
1793    for rowid in table.rowids() {
1794        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
1795
1796        if !current_leaf.would_fit(entry_bytes.len()) {
1797            // The new leaf goes at whatever the allocator hands out
1798            // next. Commit the current leaf with that as its sibling
1799            // pointer.
1800            let next_leaf_page_num = alloc.allocate();
1801            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1802            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1803            current_leaf = TablePage::empty();
1804            current_leaf_page = next_leaf_page_num;
1805            // current_max_rowid is reassigned by the insert below; no need
1806            // to zero it out here.
1807
1808            if !current_leaf.would_fit(entry_bytes.len()) {
1809                return Err(SQLRiteError::Internal(format!(
1810                    "entry of {} bytes exceeds empty-page capacity {}",
1811                    entry_bytes.len(),
1812                    current_leaf.free_space()
1813                )));
1814            }
1815        }
1816        current_leaf.insert_entry(rowid, &entry_bytes)?;
1817        current_max_rowid = Some(rowid);
1818    }
1819
1820    // Final leaf: sibling next_page = 0 (end of chain).
1821    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1822    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1823    Ok(leaves)
1824}
1825
1826/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1827/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1828/// encoded cell exceeded the inline threshold. Allocates any overflow
1829/// pages from `alloc`.
1830fn build_row_entry(
1831    pager: &mut Pager,
1832    table: &Table,
1833    rowid: i64,
1834    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1835) -> Result<Vec<u8>> {
1836    let values = table.extract_row(rowid);
1837    let local_cell = Cell::new(rowid, values);
1838    let local_bytes = local_cell.encode()?;
1839    if local_bytes.len() > OVERFLOW_THRESHOLD {
1840        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
1841        Ok(OverflowRef {
1842            rowid,
1843            total_body_len: local_bytes.len() as u64,
1844            first_overflow_page: overflow_start,
1845        }
1846        .encode())
1847    } else {
1848        Ok(local_bytes)
1849    }
1850}
1851
1852/// Builds one level of `InteriorNode` pages above the given children.
1853/// Each interior packs as many dividers as will fit; the last child
1854/// assigned to an interior becomes its `rightmost_child`. Returns the
1855/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
1856fn stage_interior_level(
1857    pager: &mut Pager,
1858    children: &[(u32, i64)],
1859    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1860) -> Result<Vec<(u32, i64)>> {
1861    let mut next_level: Vec<(u32, i64)> = Vec::new();
1862    let mut idx = 0usize;
1863
1864    while idx < children.len() {
1865        let interior_page_num = alloc.allocate();
1866
1867        // Seed the interior with the first unassigned child as its
1868        // rightmost. As we add more children, the previous rightmost
1869        // graduates to being a divider and the new arrival takes over
1870        // as rightmost.
1871        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1872        idx += 1;
1873        let mut interior = InteriorPage::empty(rightmost_child_page);
1874
1875        while idx < children.len() {
1876            let new_divider_cell = InteriorCell {
1877                divider_rowid: rightmost_child_max,
1878                child_page: rightmost_child_page,
1879            };
1880            let new_divider_bytes = new_divider_cell.encode();
1881            if !interior.would_fit(new_divider_bytes.len()) {
1882                break;
1883            }
1884            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1885            let (next_child_page, next_child_max) = children[idx];
1886            interior.set_rightmost_child(next_child_page);
1887            rightmost_child_page = next_child_page;
1888            rightmost_child_max = next_child_max;
1889            idx += 1;
1890        }
1891
1892        emit_interior(pager, interior_page_num, &interior);
1893        next_level.push((interior_page_num, rightmost_child_max));
1894    }
1895
1896    Ok(next_level)
1897}
1898
1899/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1900fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1901    let mut buf = [0u8; PAGE_SIZE];
1902    buf[0] = PageType::TableLeaf as u8;
1903    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1904    // For leaf pages the legacy `payload_len` field isn't used — the slot
1905    // directory self-describes. Zero it by convention.
1906    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1907    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1908    pager.stage_page(page_num, buf);
1909}
1910
1911/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1912/// don't use `next_page` (there's no sibling chain between interiors);
1913/// `payload_len` is also unused (the slot directory self-describes).
1914fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1915    let mut buf = [0u8; PAGE_SIZE];
1916    buf[0] = PageType::InteriorNode as u8;
1917    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1918    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1919    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1920    pager.stage_page(page_num, buf);
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925    use super::*;
1926    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
1927    use crate::sql::process_command;
1928
1929    fn seed_db() -> Database {
1930        let mut db = Database::new("test".to_string());
1931        process_command(
1932            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1933            &mut db,
1934        )
1935        .unwrap();
1936        process_command(
1937            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1938            &mut db,
1939        )
1940        .unwrap();
1941        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1942        process_command(
1943            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1944            &mut db,
1945        )
1946        .unwrap();
1947        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1948        db
1949    }
1950
1951    fn tmp_path(name: &str) -> std::path::PathBuf {
1952        let mut p = std::env::temp_dir();
1953        let pid = std::process::id();
1954        let nanos = std::time::SystemTime::now()
1955            .duration_since(std::time::UNIX_EPOCH)
1956            .map(|d| d.as_nanos())
1957            .unwrap_or(0);
1958        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1959        p
1960    }
1961
1962    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1963    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1964    fn cleanup(path: &std::path::Path) {
1965        let _ = std::fs::remove_file(path);
1966        let mut wal = path.as_os_str().to_owned();
1967        wal.push("-wal");
1968        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1969    }
1970
1971    #[test]
1972    fn round_trip_preserves_schema_and_data() {
1973        let path = tmp_path("roundtrip");
1974        let mut db = seed_db();
1975        save_database(&mut db, &path).expect("save");
1976
1977        let loaded = open_database(&path, "test".to_string()).expect("open");
1978        assert_eq!(loaded.tables.len(), 2);
1979
1980        let users = loaded.get_table("users".to_string()).expect("users table");
1981        assert_eq!(users.columns.len(), 3);
1982        let rowids = users.rowids();
1983        assert_eq!(rowids.len(), 2);
1984        let names: Vec<String> = rowids
1985            .iter()
1986            .filter_map(|r| match users.get_value("name", *r) {
1987                Some(Value::Text(s)) => Some(s),
1988                _ => None,
1989            })
1990            .collect();
1991        assert!(names.contains(&"alice".to_string()));
1992        assert!(names.contains(&"bob".to_string()));
1993
1994        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1995        assert_eq!(notes.rowids().len(), 1);
1996
1997        cleanup(&path);
1998    }
1999
2000    // -----------------------------------------------------------------
2001    // Phase 7a — VECTOR(N) save / reopen round-trip
2002    // -----------------------------------------------------------------
2003
2004    #[test]
2005    fn round_trip_preserves_vector_column() {
2006        let path = tmp_path("vec_roundtrip");
2007
2008        // Build, populate, save.
2009        {
2010            let mut db = Database::new("test".to_string());
2011            process_command(
2012                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
2013                &mut db,
2014            )
2015            .unwrap();
2016            process_command(
2017                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
2018                &mut db,
2019            )
2020            .unwrap();
2021            process_command(
2022                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
2023                &mut db,
2024            )
2025            .unwrap();
2026            save_database(&mut db, &path).expect("save");
2027        } // db drops → its exclusive lock releases before reopen.
2028
2029        // Reopen and verify schema + data both round-tripped.
2030        let loaded = open_database(&path, "test".to_string()).expect("open");
2031        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2032
2033        // Schema preserved: column is still VECTOR(3).
2034        let embedding_col = docs
2035            .columns
2036            .iter()
2037            .find(|c| c.column_name == "embedding")
2038            .expect("embedding column");
2039        assert!(
2040            matches!(embedding_col.datatype, DataType::Vector(3)),
2041            "expected DataType::Vector(3) after round-trip, got {:?}",
2042            embedding_col.datatype
2043        );
2044
2045        // Data preserved: both vectors still readable bit-for-bit.
2046        let mut rows: Vec<Vec<f32>> = docs
2047            .rowids()
2048            .iter()
2049            .filter_map(|r| match docs.get_value("embedding", *r) {
2050                Some(Value::Vector(v)) => Some(v),
2051                _ => None,
2052            })
2053            .collect();
2054        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
2055        assert_eq!(rows.len(), 2);
2056        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
2057        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
2058
2059        cleanup(&path);
2060    }
2061
2062    #[test]
2063    fn round_trip_preserves_json_column() {
2064        // Phase 7e — JSON columns are stored as Text under the hood with
2065        // INSERT-time validation. Save + reopen should preserve the
2066        // schema (DataType::Json) and the underlying text bytes; a
2067        // post-reopen json_extract should still resolve paths correctly.
2068        let path = tmp_path("json_roundtrip");
2069
2070        {
2071            let mut db = Database::new("test".to_string());
2072            process_command(
2073                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2074                &mut db,
2075            )
2076            .unwrap();
2077            process_command(
2078                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2079                &mut db,
2080            )
2081            .unwrap();
2082            save_database(&mut db, &path).expect("save");
2083        }
2084
2085        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2086        let docs = loaded.get_table("docs".to_string()).expect("docs");
2087
2088        // Schema: column declared as JSON, restored with the same type.
2089        let payload_col = docs
2090            .columns
2091            .iter()
2092            .find(|c| c.column_name == "payload")
2093            .unwrap();
2094        assert!(
2095            matches!(payload_col.datatype, DataType::Json),
2096            "expected DataType::Json, got {:?}",
2097            payload_col.datatype
2098        );
2099
2100        // json_extract works against the reopened data — exercises the
2101        // full Text-storage + serde_json::from_str path post-reopen.
2102        let resp = process_command(
2103            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2104            &mut loaded,
2105        )
2106        .expect("select via json_extract after reopen");
2107        assert!(resp.contains("1 row returned"), "got: {resp}");
2108
2109        cleanup(&path);
2110    }
2111
2112    #[test]
2113    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
2114        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2115        // pages. After save+reopen the index entry reattaches with the
2116        // same column + same node count, loaded directly from disk
2117        // instead of re-walking rows.
2118        let path = tmp_path("hnsw_roundtrip");
2119
2120        // Build, populate, index, save.
2121        {
2122            let mut db = Database::new("test".to_string());
2123            process_command(
2124                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2125                &mut db,
2126            )
2127            .unwrap();
2128            for v in &[
2129                "[1.0, 0.0]",
2130                "[2.0, 0.0]",
2131                "[0.0, 3.0]",
2132                "[1.0, 4.0]",
2133                "[10.0, 10.0]",
2134            ] {
2135                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2136            }
2137            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2138            save_database(&mut db, &path).expect("save");
2139        } // db drops → exclusive lock releases.
2140
2141        // Reopen and verify the index reattached, with the same name +
2142        // column + populated graph.
2143        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2144        {
2145            let table = loaded.get_table("docs".to_string()).expect("docs");
2146            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
2147            let entry = &table.hnsw_indexes[0];
2148            assert_eq!(entry.name, "ix_e");
2149            assert_eq!(entry.column_name, "e");
2150            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
2151            assert!(
2152                !entry.needs_rebuild,
2153                "fresh load should not be marked dirty"
2154            );
2155        }
2156
2157        // Quick functional check: KNN query through the loaded index
2158        // returns results.
2159        let resp = process_command(
2160            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2161            &mut loaded,
2162        )
2163        .unwrap();
2164        assert!(resp.contains("3 rows returned"), "got: {resp}");
2165
2166        cleanup(&path);
2167    }
2168
2169    #[test]
2170    fn round_trip_rebuilds_fts_index_from_create_sql() {
2171        // Phase 8c: FTS indexes now persist their posting lists as
2172        // cell-encoded pages. After save+reopen the index entry
2173        // reattaches with the same column + same posting count, loaded
2174        // directly from disk (no re-tokenization).
2175        let path = tmp_path("fts_roundtrip");
2176
2177        {
2178            let mut db = Database::new("test".to_string());
2179            process_command(
2180                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2181                &mut db,
2182            )
2183            .unwrap();
2184            for body in &[
2185                "rust embedded database",
2186                "rust web framework",
2187                "go embedded systems",
2188                "python web framework",
2189                "rust rust embedded power",
2190            ] {
2191                process_command(
2192                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2193                    &mut db,
2194                )
2195                .unwrap();
2196            }
2197            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2198            save_database(&mut db, &path).expect("save");
2199        } // db drops → exclusive lock releases.
2200
2201        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2202        {
2203            let table = loaded.get_table("docs".to_string()).expect("docs");
2204            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
2205            let entry = &table.fts_indexes[0];
2206            assert_eq!(entry.name, "ix_body");
2207            assert_eq!(entry.column_name, "body");
2208            assert_eq!(
2209                entry.index.len(),
2210                5,
2211                "rebuilt posting list should hold all 5 rows"
2212            );
2213            assert!(!entry.needs_rebuild);
2214        }
2215
2216        // Functional smoke: an FTS query through the reloaded index
2217        // returns the expected hit count.
2218        let resp = process_command(
2219            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2220            &mut loaded,
2221        )
2222        .unwrap();
2223        assert!(resp.contains("3 rows returned"), "got: {resp}");
2224
2225        cleanup(&path);
2226    }
2227
2228    #[test]
2229    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
2230        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2231        // from current rows; reopen replays the CREATE INDEX SQL against
2232        // the post-delete row set. The deleted rowid must not surface
2233        // in `fts_match` results post-reopen.
2234        let path = tmp_path("fts_delete_rebuild");
2235        let mut db = Database::new("test".to_string());
2236        process_command(
2237            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2238            &mut db,
2239        )
2240        .unwrap();
2241        for body in &[
2242            "rust embedded",
2243            "rust framework",
2244            "go embedded",
2245            "python web",
2246        ] {
2247            process_command(
2248                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2249                &mut db,
2250            )
2251            .unwrap();
2252        }
2253        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2254
2255        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2256        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2257        save_database(&mut db, &path).expect("save");
2258        drop(db);
2259
2260        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2261        let resp = process_command(
2262            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2263            &mut loaded,
2264        )
2265        .unwrap();
2266        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2267        // 'rust'. Post-delete: only id=2 remains.
2268        assert!(resp.contains("1 row returned"), "got: {resp}");
2269
2270        cleanup(&path);
2271    }
2272
2273    #[test]
2274    fn fts_roundtrip_uses_persistence_path_not_replay() {
2275        // Phase 8c — assert the reload didn't go through the
2276        // rootpage=0 replay shortcut. We do this by reading the
2277        // sqlrite_master row for the FTS index and confirming its
2278        // rootpage field is non-zero.
2279        let path = tmp_path("fts_persistence_path");
2280
2281        {
2282            let mut db = Database::new("test".to_string());
2283            process_command(
2284                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2285                &mut db,
2286            )
2287            .unwrap();
2288            process_command(
2289                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2290                &mut db,
2291            )
2292            .unwrap();
2293            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2294            save_database(&mut db, &path).expect("save");
2295        }
2296
2297        // Read raw sqlrite_master to find the FTS index row.
2298        let pager = Pager::open(&path).expect("open pager");
2299        let mut master = build_empty_master_table();
2300        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2301        let mut found_rootpage: Option<u32> = None;
2302        for rowid in master.rowids() {
2303            let name = take_text(&master, "name", rowid).unwrap();
2304            if name == "ix_body" {
2305                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2306                found_rootpage = Some(rp as u32);
2307            }
2308        }
2309        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2310        assert!(
2311            rootpage != 0,
2312            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2313        );
2314
2315        cleanup(&path);
2316    }
2317
2318    #[test]
2319    fn save_without_fts_keeps_format_v4() {
2320        // Phase 8c on-demand bump — a database with zero FTS indexes
2321        // continues writing the v4 header. Existing v4 users must not
2322        // see their files silently promoted to v5 by an upgrade.
2323        use crate::sql::pager::header::FORMAT_VERSION_V4;
2324
2325        let path = tmp_path("fts_no_bump");
2326        let mut db = Database::new("test".to_string());
2327        process_command(
2328            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2329            &mut db,
2330        )
2331        .unwrap();
2332        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2333        save_database(&mut db, &path).unwrap();
2334        drop(db);
2335
2336        let pager = Pager::open(&path).expect("open");
2337        assert_eq!(
2338            pager.header().format_version,
2339            FORMAT_VERSION_V4,
2340            "no-FTS save should keep v4"
2341        );
2342        cleanup(&path);
2343    }
2344
2345    #[test]
2346    fn save_with_fts_bumps_to_v5() {
2347        // Phase 8c on-demand bump — first FTS-bearing save promotes
2348        // the file to v5. v5 readers handle both v4 and v5; v4
2349        // readers correctly refuse a v5 file.
2350        use crate::sql::pager::header::FORMAT_VERSION_V5;
2351
2352        let path = tmp_path("fts_bump_v5");
2353        let mut db = Database::new("test".to_string());
2354        process_command(
2355            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2356            &mut db,
2357        )
2358        .unwrap();
2359        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2360        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2361        save_database(&mut db, &path).unwrap();
2362        drop(db);
2363
2364        let pager = Pager::open(&path).expect("open");
2365        assert_eq!(
2366            pager.header().format_version,
2367            FORMAT_VERSION_V5,
2368            "FTS save should promote to v5"
2369        );
2370        cleanup(&path);
2371    }
2372
2373    #[test]
2374    fn fts_persistence_handles_empty_and_zero_token_docs() {
2375        // Phase 8c — sidecar cell carries doc-lengths for every doc
2376        // including any with zero tokens (so total_docs is honest
2377        // post-reopen). Empty index also round-trips: a CREATE INDEX
2378        // on an empty table emits a single empty leaf with just the
2379        // (empty) sidecar.
2380        let path = tmp_path("fts_edges");
2381
2382        {
2383            let mut db = Database::new("test".to_string());
2384            process_command(
2385                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2386                &mut db,
2387            )
2388            .unwrap();
2389            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2390            // Mix: real text, then a row that tokenizes to zero tokens
2391            // (only punctuation), then real again.
2392            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2393            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2394            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2395            save_database(&mut db, &path).unwrap();
2396        }
2397
2398        let loaded = open_database(&path, "test".to_string()).expect("open");
2399        let table = loaded.get_table("docs".to_string()).unwrap();
2400        let entry = &table.fts_indexes[0];
2401        // All three rows present — including the zero-token row,
2402        // which is critical for total_docs honesty in BM25.
2403        assert_eq!(entry.index.len(), 3);
2404        // 'embedded' appears in 2 rows after reload.
2405        let res = entry
2406            .index
2407            .query("embedded", &crate::sql::fts::Bm25Params::default());
2408        assert_eq!(res.len(), 2);
2409
2410        cleanup(&path);
2411    }
2412
2413    #[test]
2414    fn fts_persistence_round_trips_large_corpus() {
2415        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2416        // single-token bodies generates enough cells to overflow a
2417        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2418        let path = tmp_path("fts_large_corpus");
2419
2420        let mut expected_terms: std::collections::BTreeSet<String> =
2421            std::collections::BTreeSet::new();
2422        {
2423            let mut db = Database::new("test".to_string());
2424            process_command(
2425                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2426                &mut db,
2427            )
2428            .unwrap();
2429            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2430            // 500 docs, each one a unique term — drives unique-term
2431            // count up so multiple leaves are required.
2432            for i in 0..500 {
2433                let term = format!("term{i:04}");
2434                process_command(
2435                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2436                    &mut db,
2437                )
2438                .unwrap();
2439                expected_terms.insert(term);
2440            }
2441            save_database(&mut db, &path).unwrap();
2442        }
2443
2444        let loaded = open_database(&path, "test".to_string()).expect("open");
2445        let table = loaded.get_table("docs".to_string()).unwrap();
2446        let entry = &table.fts_indexes[0];
2447        assert_eq!(entry.index.len(), 500);
2448
2449        // Spot-check a handful of terms come back with their original
2450        // single-row posting list.
2451        for &i in &[0_i64, 137, 248, 391, 499] {
2452            let term = format!("term{i:04}");
2453            let res = entry
2454                .index
2455                .query(&term, &crate::sql::fts::Bm25Params::default());
2456            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2457            // PrimaryKey rowids start at 1; doc i was inserted at
2458            // rowid i+1.
2459            assert_eq!(res[0].0, i + 1);
2460        }
2461
2462        cleanup(&path);
2463    }
2464
2465    #[test]
2466    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2467        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2468        // current rows + serializes; reopen loads the post-delete graph.
2469        // After all that, the deleted rowid must NOT come back from a
2470        // KNN query.
2471        let path = tmp_path("hnsw_delete_rebuild");
2472        let mut db = Database::new("test".to_string());
2473        process_command(
2474            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2475            &mut db,
2476        )
2477        .unwrap();
2478        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2479            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2480        }
2481        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2482
2483        // Delete row 1 (the closest match to [0.5, 0.0]).
2484        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2485        // Confirm it marked dirty.
2486        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2487        assert!(dirty_before_save, "DELETE should mark dirty");
2488
2489        save_database(&mut db, &path).expect("save");
2490        // Confirm save cleared the dirty flag.
2491        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2492        assert!(!dirty_after_save, "save should clear dirty");
2493        drop(db);
2494
2495        // Reopen, query for the closest match. Row 1 is gone; row 2
2496        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2497        let loaded = open_database(&path, "test".to_string()).expect("open");
2498        let docs = loaded.get_table("docs".to_string()).expect("docs");
2499
2500        // Row 1 must not appear in any storage anymore.
2501        assert!(
2502            !docs.rowids().contains(&1),
2503            "deleted row 1 should not be in row storage"
2504        );
2505        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2506
2507        // The HNSW index must also have shed the deleted node.
2508        assert_eq!(
2509            docs.hnsw_indexes[0].index.len(),
2510            3,
2511            "HNSW graph should have shed the deleted node"
2512        );
2513
2514        cleanup(&path);
2515    }
2516
2517    #[test]
2518    fn round_trip_survives_writes_after_load() {
2519        let path = tmp_path("after_load");
2520        save_database(&mut seed_db(), &path).unwrap();
2521
2522        {
2523            let mut db = open_database(&path, "test".to_string()).unwrap();
2524            process_command(
2525                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2526                &mut db,
2527            )
2528            .unwrap();
2529            save_database(&mut db, &path).unwrap();
2530        } // db drops → its exclusive lock releases before we reopen below.
2531
2532        let db2 = open_database(&path, "test".to_string()).unwrap();
2533        let users = db2.get_table("users".to_string()).unwrap();
2534        assert_eq!(users.rowids().len(), 3);
2535
2536        cleanup(&path);
2537    }
2538
2539    #[test]
2540    fn open_rejects_garbage_file() {
2541        let path = tmp_path("bad");
2542        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2543        let result = open_database(&path, "x".to_string());
2544        assert!(result.is_err());
2545        cleanup(&path);
2546    }
2547
2548    #[test]
2549    fn many_small_rows_spread_across_leaves() {
2550        let path = tmp_path("many_rows");
2551        let mut db = Database::new("big".to_string());
2552        process_command(
2553            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2554            &mut db,
2555        )
2556        .unwrap();
2557        for i in 0..200 {
2558            let body = "x".repeat(200);
2559            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2560            process_command(&q, &mut db).unwrap();
2561        }
2562        save_database(&mut db, &path).unwrap();
2563        let loaded = open_database(&path, "big".to_string()).unwrap();
2564        let things = loaded.get_table("things".to_string()).unwrap();
2565        assert_eq!(things.rowids().len(), 200);
2566        cleanup(&path);
2567    }
2568
2569    #[test]
2570    fn huge_row_goes_through_overflow() {
2571        let path = tmp_path("overflow_row");
2572        let mut db = Database::new("big".to_string());
2573        process_command(
2574            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2575            &mut db,
2576        )
2577        .unwrap();
2578        let body = "A".repeat(10_000);
2579        process_command(
2580            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2581            &mut db,
2582        )
2583        .unwrap();
2584        save_database(&mut db, &path).unwrap();
2585
2586        let loaded = open_database(&path, "big".to_string()).unwrap();
2587        let docs = loaded.get_table("docs".to_string()).unwrap();
2588        let rowids = docs.rowids();
2589        assert_eq!(rowids.len(), 1);
2590        let stored = docs.get_value("body", rowids[0]);
2591        match stored {
2592            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2593            other => panic!("expected Text, got {other:?}"),
2594        }
2595        cleanup(&path);
2596    }
2597
2598    #[test]
2599    fn create_sql_synthesis_round_trips() {
2600        // Build a table via CREATE, then verify table_to_create_sql +
2601        // parse_create_sql reproduce an equivalent column list.
2602        let mut db = Database::new("x".to_string());
2603        process_command(
2604            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2605            &mut db,
2606        )
2607        .unwrap();
2608        let t = db.get_table("t".to_string()).unwrap();
2609        let sql = table_to_create_sql(t);
2610        let (name, cols) = parse_create_sql(&sql).unwrap();
2611        assert_eq!(name, "t");
2612        assert_eq!(cols.len(), 3);
2613        assert!(cols[0].is_pk);
2614        assert!(cols[1].is_unique);
2615        assert!(cols[2].not_null);
2616    }
2617
2618    #[test]
2619    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2620        // After open, the public db.tables map should not list the master.
2621        let path = tmp_path("no_master");
2622        save_database(&mut seed_db(), &path).unwrap();
2623        let loaded = open_database(&path, "x".to_string()).unwrap();
2624        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2625        cleanup(&path);
2626    }
2627
2628    #[test]
2629    fn multi_leaf_table_produces_an_interior_root() {
2630        // 200 fat rows force the table into multiple leaves, which means
2631        // save_database must build at least one InteriorNode above them.
2632        // The test verifies the round-trip works and confirms the root is
2633        // indeed an interior page (not a leaf) by reading the page type
2634        // directly out of the open pager.
2635        let path = tmp_path("multi_leaf_interior");
2636        let mut db = Database::new("big".to_string());
2637        process_command(
2638            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2639            &mut db,
2640        )
2641        .unwrap();
2642        for i in 0..200 {
2643            let body = "x".repeat(200);
2644            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2645            process_command(&q, &mut db).unwrap();
2646        }
2647        save_database(&mut db, &path).unwrap();
2648
2649        // Confirm the round-trip preserved all 200 rows.
2650        let loaded = open_database(&path, "big".to_string()).unwrap();
2651        let things = loaded.get_table("things".to_string()).unwrap();
2652        assert_eq!(things.rowids().len(), 200);
2653
2654        // Peek at `things`'s root page via the pager attached to the
2655        // loaded DB and check it's an InteriorNode, not a leaf.
2656        let pager = loaded
2657            .pager
2658            .as_ref()
2659            .expect("loaded DB should have a pager");
2660        // sqlrite_master's row for `things` holds its root page. Easiest
2661        // way to find it: walk the leaf chain by using find_leftmost_leaf
2662        // and then hop one level up. Simpler: read the master, scan for
2663        // the "things" row, look up rootpage.
2664        let mut master = build_empty_master_table();
2665        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2666        let things_root = master
2667            .rowids()
2668            .into_iter()
2669            .find_map(|r| match master.get_value("name", r) {
2670                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2671                    Some(Value::Integer(p)) => Some(p as u32),
2672                    _ => None,
2673                },
2674                _ => None,
2675            })
2676            .expect("things should appear in sqlrite_master");
2677        let root_buf = pager.read_page(things_root).unwrap();
2678        assert_eq!(
2679            root_buf[0],
2680            PageType::InteriorNode as u8,
2681            "expected a multi-leaf table to have an interior root, got tag {}",
2682            root_buf[0]
2683        );
2684
2685        cleanup(&path);
2686    }
2687
2688    #[test]
2689    fn explicit_index_persists_across_save_and_open() {
2690        let path = tmp_path("idx_persist");
2691        let mut db = Database::new("idx".to_string());
2692        process_command(
2693            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2694            &mut db,
2695        )
2696        .unwrap();
2697        for i in 1..=5 {
2698            let tag = if i % 2 == 0 { "odd" } else { "even" };
2699            process_command(
2700                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2701                &mut db,
2702            )
2703            .unwrap();
2704        }
2705        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2706        save_database(&mut db, &path).unwrap();
2707
2708        let loaded = open_database(&path, "idx".to_string()).unwrap();
2709        let users = loaded.get_table("users".to_string()).unwrap();
2710        let idx = users
2711            .index_by_name("users_tag_idx")
2712            .expect("explicit index should survive save/open");
2713        assert_eq!(idx.column_name, "tag");
2714        assert!(!idx.is_unique);
2715        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2716        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2717        let even_rowids = idx.lookup(&Value::Text("even".into()));
2718        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2719        assert_eq!(even_rowids.len(), 3);
2720        assert_eq!(odd_rowids.len(), 2);
2721
2722        cleanup(&path);
2723    }
2724
2725    #[test]
2726    fn auto_indexes_for_unique_columns_survive_save_open() {
2727        let path = tmp_path("auto_idx_persist");
2728        let mut db = Database::new("a".to_string());
2729        process_command(
2730            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2731            &mut db,
2732        )
2733        .unwrap();
2734        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2735        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2736        save_database(&mut db, &path).unwrap();
2737
2738        let loaded = open_database(&path, "a".to_string()).unwrap();
2739        let users = loaded.get_table("users".to_string()).unwrap();
2740        // Every UNIQUE column auto-creates an index; the load path populated
2741        // it from the persisted entries.
2742        let auto_name = SecondaryIndex::auto_name("users", "email");
2743        let idx = users
2744            .index_by_name(&auto_name)
2745            .expect("auto index should be restored");
2746        assert!(idx.is_unique);
2747        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2748        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2749
2750        cleanup(&path);
2751    }
2752
2753    /// SQLR-1 — `CREATE INDEX` on a wide table must round-trip when the
2754    /// index B-tree grows past one leaf and needs an interior level.
2755    /// Before the fix, the post-DDL auto-save panicked with
2756    /// `Internal("unknown paged-entry kind tag 0x4 …")` because a
2757    /// table-cell decoder was being run against an index leaf
2758    /// (`KIND_INDEX = 0x04`).
2759    ///
2760    /// 5 000 rows mirror the original repro from the issue and exceed
2761    /// every leaf-fanout cliff for the small `(rowid, value)` cells in
2762    /// a TEXT-keyed secondary index.
2763    #[test]
2764    fn secondary_index_with_interior_level_round_trips() {
2765        let path = tmp_path("sqlr1_wide_index");
2766        let mut db = Database::new("idx".to_string());
2767        db.source_path = Some(path.clone());
2768
2769        process_command(
2770            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2771            &mut db,
2772        )
2773        .unwrap();
2774        // BEGIN/COMMIT collapses 5 000 inserts into one save (matches
2775        // `auto_vacuum_setup` and the issue's repro shape).
2776        process_command("BEGIN;", &mut db).unwrap();
2777        for i in 0..5000 {
2778            process_command(
2779                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2780                &mut db,
2781            )
2782            .unwrap();
2783        }
2784        process_command("COMMIT;", &mut db).unwrap();
2785
2786        // The DDL that used to panic.
2787        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2788
2789        // Reopen and verify lookups, plus that the index tree actually
2790        // grew an interior layer (otherwise this test wouldn't cover the
2791        // regression).
2792        drop(db);
2793        let loaded = open_database(&path, "idx".to_string()).unwrap();
2794        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2795        let idx = bloat
2796            .index_by_name("idx_p")
2797            .expect("idx_p should survive close/reopen");
2798        assert!(!idx.is_unique);
2799
2800        // Spot-check the keyspace: first, middle, last value each map
2801        // back to exactly the row that carried them.
2802        for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
2803            let value = Value::Text(format!("p-{probe_i:08}"));
2804            let hits = idx.lookup(&value);
2805            assert_eq!(
2806                hits,
2807                vec![expected_rowid],
2808                "lookup({value:?}) should yield rowid {expected_rowid}",
2809            );
2810        }
2811
2812        // Confirm the index tree is multi-level (the regression's
2813        // necessary condition) — root must be an `InteriorNode` and
2814        // `find_leftmost_leaf` must reach a `TableLeaf` through it.
2815        let pager = loaded.pager.as_ref().unwrap();
2816        let mut master = build_empty_master_table();
2817        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2818        let idx_root = master
2819            .rowids()
2820            .into_iter()
2821            .find_map(
2822                |r| match (master.get_value("name", r), master.get_value("type", r)) {
2823                    (Some(Value::Text(name)), Some(Value::Text(kind)))
2824                        if name == "idx_p" && kind == "index" =>
2825                    {
2826                        match master.get_value("rootpage", r) {
2827                            Some(Value::Integer(p)) => Some(p as u32),
2828                            _ => None,
2829                        }
2830                    }
2831                    _ => None,
2832                },
2833            )
2834            .expect("idx_p should appear in sqlrite_master");
2835        let root_buf = pager.read_page(idx_root).unwrap();
2836        assert_eq!(
2837            root_buf[0],
2838            PageType::InteriorNode as u8,
2839            "5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
2840        );
2841        let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
2842        let leaf_buf = pager.read_page(leaf).unwrap();
2843        assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
2844
2845        cleanup(&path);
2846    }
2847
2848    /// SQLR-1 follow-on — the page-recycling path between two large
2849    /// versions of the same index name must not corrupt cell decoding.
2850    /// `DROP INDEX` returns its pages to the freelist; the next
2851    /// `CREATE INDEX` is free to reuse them. If the allocator hands an
2852    /// old index leaf to a *table* without zeroing it, an upstream
2853    /// table walk would see KIND_INDEX cells and panic.
2854    #[test]
2855    fn drop_then_recreate_wide_index_does_not_panic() {
2856        let path = tmp_path("sqlr1_drop_recreate");
2857        let mut db = Database::new("idx".to_string());
2858        db.source_path = Some(path.clone());
2859
2860        process_command(
2861            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2862            &mut db,
2863        )
2864        .unwrap();
2865        process_command("BEGIN;", &mut db).unwrap();
2866        for i in 0..5000 {
2867            process_command(
2868                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2869                &mut db,
2870            )
2871            .unwrap();
2872        }
2873        process_command("COMMIT;", &mut db).unwrap();
2874
2875        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2876        process_command("DROP INDEX idx_p;", &mut db).unwrap();
2877        // Recreate from scratch — exercises the recycle path.
2878        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
2879
2880        drop(db);
2881        let loaded = open_database(&path, "idx".to_string()).unwrap();
2882        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2883        let idx = bloat
2884            .index_by_name("idx_p")
2885            .expect("idx_p should survive drop+recreate+reopen");
2886        assert_eq!(
2887            idx.lookup(&Value::Text("p-00002500".into())),
2888            vec![2501],
2889            "post-recycle lookup must still resolve correctly",
2890        );
2891
2892        cleanup(&path);
2893    }
2894
2895    #[test]
2896    fn deep_tree_round_trips() {
2897        // Force a 3-level tree by bypassing process_command (which prints
2898        // the full table on every INSERT, making large bulk loads O(N^2)
2899        // in I/O). We build the Table directly via restore_row.
2900        use crate::sql::db::table::Column as TableColumn;
2901
2902        let path = tmp_path("deep_tree");
2903        let mut db = Database::new("deep".to_string());
2904        let columns = vec![
2905            TableColumn::new("id".into(), "integer".into(), true, true, true),
2906            TableColumn::new("s".into(), "text".into(), false, true, false),
2907        ];
2908        let mut table = build_empty_table("t", columns, 0);
2909        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
2910        // which with interior fanout ~400 needs 2 interior levels (3-level
2911        // tree total, counting leaves).
2912        for i in 1..=6_000i64 {
2913            let body = "q".repeat(900);
2914            table
2915                .restore_row(
2916                    i,
2917                    vec![
2918                        Some(Value::Integer(i)),
2919                        Some(Value::Text(format!("r-{i}-{body}"))),
2920                    ],
2921                )
2922                .unwrap();
2923        }
2924        db.tables.insert("t".to_string(), table);
2925        save_database(&mut db, &path).unwrap();
2926
2927        let loaded = open_database(&path, "deep".to_string()).unwrap();
2928        let t = loaded.get_table("t".to_string()).unwrap();
2929        assert_eq!(t.rowids().len(), 6_000);
2930
2931        // Confirm the tree actually grew past 2 levels — i.e., the root's
2932        // leftmost child is itself an interior page, not a leaf.
2933        let pager = loaded.pager.as_ref().unwrap();
2934        let mut master = build_empty_master_table();
2935        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2936        let t_root = master
2937            .rowids()
2938            .into_iter()
2939            .find_map(|r| match master.get_value("name", r) {
2940                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
2941                    Some(Value::Integer(p)) => Some(p as u32),
2942                    _ => None,
2943                },
2944                _ => None,
2945            })
2946            .expect("t in sqlrite_master");
2947        let root_buf = pager.read_page(t_root).unwrap();
2948        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
2949        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
2950            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
2951        let root_interior = InteriorPage::from_bytes(root_payload);
2952        let child = root_interior.leftmost_child().unwrap();
2953        let child_buf = pager.read_page(child).unwrap();
2954        assert_eq!(
2955            child_buf[0],
2956            PageType::InteriorNode as u8,
2957            "expected 3-level tree: root's leftmost child should also be InteriorNode",
2958        );
2959
2960        cleanup(&path);
2961    }
2962
2963    #[test]
2964    fn alter_rename_table_survives_save_and_reopen() {
2965        let path = tmp_path("alter_rename_table_roundtrip");
2966        let mut db = seed_db();
2967        save_database(&mut db, &path).expect("save");
2968
2969        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
2970        save_database(&mut db, &path).expect("save after rename");
2971
2972        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2973        assert!(!loaded.contains_table("users".to_string()));
2974        assert!(loaded.contains_table("members".to_string()));
2975        let members = loaded.get_table("members".to_string()).unwrap();
2976        assert_eq!(members.rowids().len(), 2, "rows should survive");
2977        // Auto-indexes followed the rename.
2978        assert!(
2979            members
2980                .index_by_name("sqlrite_autoindex_members_id")
2981                .is_some()
2982        );
2983        assert!(
2984            members
2985                .index_by_name("sqlrite_autoindex_members_name")
2986                .is_some()
2987        );
2988
2989        cleanup(&path);
2990    }
2991
2992    #[test]
2993    fn alter_rename_column_survives_save_and_reopen() {
2994        let path = tmp_path("alter_rename_col_roundtrip");
2995        let mut db = seed_db();
2996        save_database(&mut db, &path).expect("save");
2997
2998        process_command(
2999            "ALTER TABLE users RENAME COLUMN name TO full_name;",
3000            &mut db,
3001        )
3002        .expect("rename column");
3003        save_database(&mut db, &path).expect("save after rename");
3004
3005        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3006        let users = loaded.get_table("users".to_string()).unwrap();
3007        assert!(users.contains_column("full_name".to_string()));
3008        assert!(!users.contains_column("name".to_string()));
3009        // Verify a row's value survived the rename round-trip.
3010        let alice_rowid = users
3011            .rowids()
3012            .into_iter()
3013            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
3014            .expect("alice row should be findable under renamed column");
3015        assert_eq!(
3016            users.get_value("full_name", alice_rowid),
3017            Some(Value::Text("alice".to_string()))
3018        );
3019
3020        cleanup(&path);
3021    }
3022
3023    #[test]
3024    fn alter_add_column_with_default_survives_save_and_reopen() {
3025        let path = tmp_path("alter_add_default_roundtrip");
3026        let mut db = seed_db();
3027        save_database(&mut db, &path).expect("save");
3028
3029        process_command(
3030            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
3031            &mut db,
3032        )
3033        .expect("add column");
3034        save_database(&mut db, &path).expect("save after add");
3035
3036        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3037        let users = loaded.get_table("users".to_string()).unwrap();
3038        assert!(users.contains_column("status".to_string()));
3039        for rowid in users.rowids() {
3040            assert_eq!(
3041                users.get_value("status", rowid),
3042                Some(Value::Text("active".to_string())),
3043                "backfilled default should round-trip for rowid {rowid}"
3044            );
3045        }
3046        // The DEFAULT clause itself should still be on the column metadata
3047        // so a subsequent INSERT picks it up.
3048        let status_col = users
3049            .columns
3050            .iter()
3051            .find(|c| c.column_name == "status")
3052            .unwrap();
3053        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
3054
3055        cleanup(&path);
3056    }
3057
3058    #[test]
3059    fn alter_drop_column_survives_save_and_reopen() {
3060        let path = tmp_path("alter_drop_col_roundtrip");
3061        let mut db = seed_db();
3062        save_database(&mut db, &path).expect("save");
3063
3064        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
3065        save_database(&mut db, &path).expect("save after drop");
3066
3067        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3068        let users = loaded.get_table("users".to_string()).unwrap();
3069        assert!(!users.contains_column("age".to_string()));
3070        assert!(users.contains_column("name".to_string()));
3071
3072        cleanup(&path);
3073    }
3074
3075    #[test]
3076    fn drop_table_survives_save_and_reopen() {
3077        let path = tmp_path("drop_table_roundtrip");
3078        let mut db = seed_db();
3079        save_database(&mut db, &path).expect("save");
3080
3081        // Verify both tables landed.
3082        {
3083            let loaded = open_database(&path, "t".to_string()).expect("open");
3084            assert!(loaded.contains_table("users".to_string()));
3085            assert!(loaded.contains_table("notes".to_string()));
3086        }
3087
3088        process_command("DROP TABLE users;", &mut db).expect("drop users");
3089        save_database(&mut db, &path).expect("save after drop");
3090
3091        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3092        assert!(
3093            !loaded.contains_table("users".to_string()),
3094            "dropped table should not resurface on reopen"
3095        );
3096        assert!(
3097            loaded.contains_table("notes".to_string()),
3098            "untouched table should survive"
3099        );
3100
3101        cleanup(&path);
3102    }
3103
3104    #[test]
3105    fn drop_index_survives_save_and_reopen() {
3106        let path = tmp_path("drop_index_roundtrip");
3107        let mut db = Database::new("t".to_string());
3108        process_command(
3109            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3110            &mut db,
3111        )
3112        .unwrap();
3113        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
3114        save_database(&mut db, &path).expect("save");
3115
3116        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
3117        save_database(&mut db, &path).expect("save after drop");
3118
3119        let loaded = open_database(&path, "t".to_string()).expect("reopen");
3120        let notes = loaded.get_table("notes".to_string()).unwrap();
3121        assert!(
3122            notes.index_by_name("notes_body_idx").is_none(),
3123            "dropped index should not resurface on reopen"
3124        );
3125        // The auto-index for the PK should still be there.
3126        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
3127
3128        cleanup(&path);
3129    }
3130
3131    #[test]
3132    fn default_clause_survives_save_and_reopen() {
3133        let path = tmp_path("default_roundtrip");
3134        let mut db = Database::new("t".to_string());
3135
3136        process_command(
3137            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
3138            &mut db,
3139        )
3140        .unwrap();
3141        save_database(&mut db, &path).expect("save");
3142
3143        let mut loaded = open_database(&path, "t".to_string()).expect("open");
3144
3145        // The reloaded column metadata should still carry the DEFAULT.
3146        let users = loaded.get_table("users".to_string()).expect("users table");
3147        let status_col = users
3148            .columns
3149            .iter()
3150            .find(|c| c.column_name == "status")
3151            .expect("status column");
3152        assert_eq!(
3153            status_col.default,
3154            Some(Value::Text("active".to_string())),
3155            "DEFAULT 'active' should round-trip"
3156        );
3157        let score_col = users
3158            .columns
3159            .iter()
3160            .find(|c| c.column_name == "score")
3161            .expect("score column");
3162        assert_eq!(
3163            score_col.default,
3164            Some(Value::Integer(0)),
3165            "DEFAULT 0 should round-trip"
3166        );
3167
3168        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3169        // columns should pick them up from the reloaded schema.
3170        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
3171        let users = loaded.get_table("users".to_string()).unwrap();
3172        assert_eq!(
3173            users.get_value("status", 1),
3174            Some(Value::Text("active".to_string()))
3175        );
3176        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
3177
3178        cleanup(&path);
3179    }
3180
3181    // ---------------------------------------------------------------------
3182    // SQLR-6 — free-list + VACUUM tests
3183    // ---------------------------------------------------------------------
3184
3185    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3186    /// rather than extending the file. The page_count after drop+create
3187    /// should be at most what it was after the original two tables —
3188    /// proving the new table landed on freelist pages.
3189    #[test]
3190    fn drop_table_freelist_persists_pages_for_reuse() {
3191        let path = tmp_path("freelist_reuse");
3192        let mut db = seed_db();
3193        db.source_path = Some(path.clone());
3194        save_database(&mut db, &path).expect("save");
3195        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3196
3197        // Drop one table; its pages go on the freelist.
3198        process_command("DROP TABLE users;", &mut db).expect("drop users");
3199        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
3200        assert_eq!(
3201            pages_after_drop, pages_two_tables,
3202            "page_count should not shrink on drop — the freed pages persist on the freelist"
3203        );
3204        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
3205        assert!(
3206            head_after_drop != 0,
3207            "freelist_head must be non-zero after drop"
3208        );
3209
3210        // Re-create a similar-shaped table; should reuse freelist pages.
3211        process_command(
3212            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3213            &mut db,
3214        )
3215        .expect("create accounts");
3216        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
3217        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
3218        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3219        assert!(
3220            pages_after_create <= pages_two_tables + 2,
3221            "creating a similar-sized table after a drop should mostly draw from the \
3222             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3223        );
3224
3225        cleanup(&path);
3226    }
3227
3228    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3229    #[test]
3230    fn drop_then_vacuum_shrinks_file() {
3231        let path = tmp_path("vacuum_shrinks");
3232        let mut db = seed_db();
3233        db.source_path = Some(path.clone());
3234        // Add a few more rows to make the dropped table bigger.
3235        for i in 0..20 {
3236            process_command(
3237                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
3238                &mut db,
3239            )
3240            .unwrap();
3241        }
3242        save_database(&mut db, &path).expect("save");
3243
3244        process_command("DROP TABLE users;", &mut db).expect("drop");
3245        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
3246        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
3247        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
3248        assert!(head_before != 0, "drop should populate the freelist");
3249
3250        // VACUUM (via process_command) checkpoints internally so the
3251        // file actually shrinks on disk before we observe its size.
3252        process_command("VACUUM;", &mut db).expect("vacuum");
3253
3254        let size_after = std::fs::metadata(&path).unwrap().len();
3255        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3256        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3257        assert!(
3258            pages_after < pages_before_vacuum,
3259            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3260        );
3261        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
3262        assert!(
3263            size_after < size_before_vacuum,
3264            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3265        );
3266
3267        cleanup(&path);
3268    }
3269
3270    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3271    #[test]
3272    fn vacuum_round_trips_data() {
3273        let path = tmp_path("vacuum_round_trip");
3274        let mut db = seed_db();
3275        db.source_path = Some(path.clone());
3276        save_database(&mut db, &path).expect("save");
3277        process_command("VACUUM;", &mut db).expect("vacuum");
3278
3279        // Re-open from disk to make sure the on-disk catalog round-trips.
3280        drop(db);
3281        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
3282        assert!(loaded.contains_table("users".to_string()));
3283        assert!(loaded.contains_table("notes".to_string()));
3284        let users = loaded.get_table("users".to_string()).unwrap();
3285        // seed_db inserts two users.
3286        assert_eq!(users.rowids().len(), 2);
3287
3288        cleanup(&path);
3289    }
3290
3291    /// Format version is bumped to v6 only after a save that creates a
3292    /// non-empty freelist. VACUUM clears the freelist but doesn't
3293    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3294    #[test]
3295    fn freelist_format_version_promotion() {
3296        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3297        let path = tmp_path("v6_promotion");
3298        let mut db = seed_db();
3299        db.source_path = Some(path.clone());
3300        save_database(&mut db, &path).expect("save");
3301        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
3302        assert_eq!(
3303            v_after_save, FORMAT_VERSION_BASELINE,
3304            "fresh DB without drops should stay at the baseline version"
3305        );
3306
3307        process_command("DROP TABLE users;", &mut db).expect("drop");
3308        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
3309        assert_eq!(
3310            v_after_drop, FORMAT_VERSION_V6,
3311            "first save with a non-empty freelist must promote to V6"
3312        );
3313
3314        process_command("VACUUM;", &mut db).expect("vacuum");
3315        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
3316        assert_eq!(
3317            v_after_vacuum, FORMAT_VERSION_V6,
3318            "VACUUM must not downgrade — V6 is a strict superset"
3319        );
3320
3321        cleanup(&path);
3322    }
3323
3324    /// Freelist persists across reopen: drop, save, close, reopen,
3325    /// confirm the next CREATE TABLE re-uses pages from the persisted
3326    /// freelist (rather than extending the file).
3327    #[test]
3328    fn freelist_round_trip_through_reopen() {
3329        let path = tmp_path("freelist_reopen");
3330        let pages_two_tables;
3331        {
3332            let mut db = seed_db();
3333            db.source_path = Some(path.clone());
3334            save_database(&mut db, &path).expect("save");
3335            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3336            process_command("DROP TABLE users;", &mut db).expect("drop");
3337            let head = db.pager.as_ref().unwrap().header().freelist_head;
3338            assert!(head != 0, "drop must populate the freelist");
3339        }
3340
3341        // Reopen from disk — the freelist must come back.
3342        let mut db = open_database(&path, "t".to_string()).expect("reopen");
3343        assert!(
3344            db.pager.as_ref().unwrap().header().freelist_head != 0,
3345            "freelist_head must survive close/reopen"
3346        );
3347
3348        process_command(
3349            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3350            &mut db,
3351        )
3352        .expect("create accounts");
3353        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
3354        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3355        assert!(
3356            pages_after_create <= pages_two_tables + 2,
3357            "post-reopen create should reuse freelist (got {pages_after_create} > \
3358             {pages_two_tables} + 2 — file extended instead of reusing)"
3359        );
3360
3361        cleanup(&path);
3362    }
3363
3364    /// VACUUM inside an explicit transaction must error before touching the
3365    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3366    #[test]
3367    fn vacuum_inside_transaction_is_rejected() {
3368        let path = tmp_path("vacuum_txn");
3369        let mut db = seed_db();
3370        db.source_path = Some(path.clone());
3371        save_database(&mut db, &path).expect("save");
3372
3373        process_command("BEGIN;", &mut db).expect("begin");
3374        let err = process_command("VACUUM;", &mut db).unwrap_err();
3375        assert!(
3376            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3377            "expected in-transaction rejection, got: {err}"
3378        );
3379        // Roll back to leave the DB in a clean state.
3380        process_command("ROLLBACK;", &mut db).unwrap();
3381        cleanup(&path);
3382    }
3383
3384    /// VACUUM on an in-memory database is a documented no-op.
3385    #[test]
3386    fn vacuum_on_in_memory_database_is_noop() {
3387        let mut db = Database::new("mem".to_string());
3388        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
3389        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
3390        assert!(
3391            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
3392            "expected no-op message for in-memory VACUUM, got: {out}"
3393        );
3394    }
3395
3396    /// Untouched tables shouldn't write any pages on the save that
3397    /// follows a DROP of an unrelated table. Confirms the per-table
3398    /// preferred pool keeps page numbers stable so the diff pager skips
3399    /// every byte-identical leaf.
3400    #[test]
3401    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3402        // Need three tables so dropping one in the middle still leaves
3403        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3404        //   accounts, notes, users
3405        // Drop `notes`. `accounts` and `users` should keep their pages.
3406        let path = tmp_path("diff_after_drop");
3407        let mut db = Database::new("t".to_string());
3408        db.source_path = Some(path.clone());
3409        process_command(
3410            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3411            &mut db,
3412        )
3413        .unwrap();
3414        process_command(
3415            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3416            &mut db,
3417        )
3418        .unwrap();
3419        process_command(
3420            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3421            &mut db,
3422        )
3423        .unwrap();
3424        for i in 0..5 {
3425            process_command(
3426                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
3427                &mut db,
3428            )
3429            .unwrap();
3430            process_command(
3431                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
3432                &mut db,
3433            )
3434            .unwrap();
3435            process_command(
3436                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
3437                &mut db,
3438            )
3439            .unwrap();
3440        }
3441        save_database(&mut db, &path).expect("baseline save");
3442
3443        // Capture page bytes for `accounts` and `users` so we can
3444        // verify they don't change.
3445        let pager = db.pager.as_ref().unwrap();
3446        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
3447            .unwrap()
3448            .get(&("table".to_string(), "accounts".to_string()))
3449            .copied()
3450            .unwrap();
3451        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
3452            .unwrap()
3453            .get(&("table".to_string(), "users".to_string()))
3454            .copied()
3455            .unwrap();
3456        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
3457        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
3458
3459        // Drop the middle table.
3460        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
3461
3462        let pager = db.pager.as_ref().unwrap();
3463        // `accounts` and `users` should still live at the same pages
3464        // with byte-identical content.
3465        let acc_after = pager.read_page(acc_root).unwrap();
3466        let users_after = pager.read_page(users_root).unwrap();
3467        assert_eq!(
3468            &acc_after[..],
3469            &acc_bytes_before[..],
3470            "accounts root page must not be rewritten when an unrelated table is dropped"
3471        );
3472        assert_eq!(
3473            &users_after[..],
3474            &users_bytes_before[..],
3475            "users root page must not be rewritten when an unrelated table is dropped"
3476        );
3477
3478        cleanup(&path);
3479    }
3480
3481    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3482
3483    /// Builds a file-backed DB with one small "keep" table and one
3484    /// large "bloat" table, sized so the post-drop freelist will
3485    /// comfortably cross the default 25% threshold and the
3486    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3487    /// auto-VACUUM happy-path tests.
3488    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
3489        let mut db = Database::new("av".to_string());
3490        db.source_path = Some(path.to_path_buf());
3491        process_command(
3492            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3493            &mut db,
3494        )
3495        .unwrap();
3496        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
3497        process_command(
3498            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3499            &mut db,
3500        )
3501        .unwrap();
3502        // Wrap the bulk insert in a transaction so we pay one save at
3503        // COMMIT instead of 5000 round-trips through auto-save.
3504        process_command("BEGIN;", &mut db).unwrap();
3505        for i in 0..5000 {
3506            process_command(
3507                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
3508                &mut db,
3509            )
3510            .unwrap();
3511        }
3512        process_command("COMMIT;", &mut db).unwrap();
3513        db
3514    }
3515
3516    /// Default threshold (0.25) is engaged for fresh `Database`s and
3517    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3518    /// without anyone calling `VACUUM;`.
3519    #[test]
3520    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3521        let path = tmp_path("av_default_drop_table");
3522        let mut db = auto_vacuum_setup(&path);
3523        // Sanity: setup respects the shipped default.
3524        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3525
3526        // Checkpoint before measuring `size_before` so the bloat actually
3527        // lives in the main file and not just the WAL — otherwise
3528        // `size_before` is the bare 2-page header and any post-vacuum
3529        // checkpoint will look like the file *grew*.
3530        if let Some(p) = db.pager.as_mut() {
3531            let _ = p.checkpoint();
3532        }
3533        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3534        let size_before = std::fs::metadata(&path).unwrap().len();
3535        assert!(
3536            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
3537            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3538             pages so the floor doesn't suppress the trigger; got {pages_before}"
3539        );
3540
3541        // Drop the bloat table — freelist should pass 25% of page_count
3542        // and the auto-VACUUM hook should compact in place. Note: no
3543        // explicit `VACUUM;` statement is issued.
3544        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3545
3546        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3547        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3548        // Second checkpoint so the post-vacuum file shrinks on disk
3549        // (auto-VACUUM stages the compact through WAL just like manual
3550        // VACUUM does).
3551        if let Some(p) = db.pager.as_mut() {
3552            let _ = p.checkpoint();
3553        }
3554        let size_after = std::fs::metadata(&path).unwrap().len();
3555
3556        assert!(
3557            pages_after < pages_before,
3558            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3559        );
3560        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
3561        assert!(
3562            size_after < size_before,
3563            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3564        );
3565
3566        cleanup(&path);
3567    }
3568
3569    /// Setting the threshold to `None` disables the trigger entirely:
3570    /// the same workload that shrinks under the default leaves the file
3571    /// at its high-water mark.
3572    #[test]
3573    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3574        let path = tmp_path("av_disabled");
3575        let mut db = auto_vacuum_setup(&path);
3576        db.set_auto_vacuum_threshold(None).expect("disable");
3577        assert_eq!(db.auto_vacuum_threshold(), None);
3578
3579        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3580
3581        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3582
3583        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3584        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3585        assert_eq!(
3586            pages_after, pages_before,
3587            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3588        );
3589        assert!(
3590            head_after != 0,
3591            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3592        );
3593
3594        cleanup(&path);
3595    }
3596
3597    /// `DROP INDEX` is the second of three page-releasing DDL paths
3598    /// covered by SQLR-10. We bloat the freelist via a separate
3599    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3600    /// compact early), then re-arm the trigger and drop a small index
3601    /// — the cumulative freelist crosses 25% on the index drop and
3602    /// auto-VACUUM fires.
3603    ///
3604    /// The detour around bloat is necessary because building a
3605    /// secondary index on a 5000-row column would need multi-level
3606    /// interior nodes, and the cell-decoder's interior-page support
3607    /// is a separate work item from SQLR-10.
3608    #[test]
3609    fn auto_vacuum_triggers_on_drop_index() {
3610        let path = tmp_path("av_drop_index");
3611        let mut db = auto_vacuum_setup(&path);
3612
3613        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3614        // its pages land on the freelist without being reclaimed.
3615        db.set_auto_vacuum_threshold(None).expect("disable");
3616        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
3617        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
3618        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
3619        assert!(
3620            head_after_bloat_drop != 0,
3621            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3622        );
3623
3624        // Phase 2: a small index on the surviving `keep` table. The
3625        // index reuses one page from the freelist (which is fine —
3626        // freelist still holds plenty more).
3627        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
3628
3629        // Phase 3: re-arm the trigger and drop the index. The freelist
3630        // is already heavily populated from phase 1; this drop just
3631        // adds the index page on top, keeping the ratio well above
3632        // 25%, so auto-VACUUM should fire.
3633        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
3634        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
3635
3636        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3637        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3638        assert!(
3639            pages_after < pages_after_bloat_drop,
3640            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3641             was {pages_after_bloat_drop}, now {pages_after}"
3642        );
3643        assert_eq!(
3644            head_after, 0,
3645            "auto-VACUUM after DROP INDEX must clear the freelist"
3646        );
3647
3648        cleanup(&path);
3649    }
3650
3651    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3652    /// the SQLR-10 trigger covers.
3653    #[test]
3654    fn auto_vacuum_triggers_on_alter_drop_column() {
3655        let path = tmp_path("av_alter_drop_col");
3656        let mut db = auto_vacuum_setup(&path);
3657        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3658
3659        // Drop the wide `payload` column — this rewrites every row in
3660        // `bloat` without the column, so the old leaf pages get freed.
3661        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
3662
3663        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3664        assert!(
3665            pages_after < pages_before,
3666            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3667             was {pages_before}, now {pages_after}"
3668        );
3669        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
3670
3671        cleanup(&path);
3672    }
3673
3674    /// A high threshold (0.99) suppresses the trigger when the freelist
3675    /// ratio is well below it — the file stays at HWM.
3676    #[test]
3677    fn auto_vacuum_skips_below_threshold() {
3678        let path = tmp_path("av_below_threshold");
3679        let mut db = auto_vacuum_setup(&path);
3680        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
3681
3682        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3683
3684        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3685
3686        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3687        assert_eq!(
3688            pages_after, pages_before,
3689            "freelist ratio after a single drop is far below 0.99 — \
3690             page_count must stay at the HWM"
3691        );
3692        assert!(
3693            db.pager.as_ref().unwrap().header().freelist_head != 0,
3694            "drop must still populate the freelist"
3695        );
3696
3697        cleanup(&path);
3698    }
3699
3700    /// Inside an explicit transaction, the page-releasing DDL doesn't
3701    /// flush to disk yet — the freelist isn't accurate, so the trigger
3702    /// must skip. The compact would also publish in-flight work out of
3703    /// band, which is exactly what the manual `VACUUM;` rejection
3704    /// inside a txn already prevents.
3705    #[test]
3706    fn auto_vacuum_skips_inside_transaction() {
3707        let path = tmp_path("av_in_txn");
3708        let mut db = auto_vacuum_setup(&path);
3709        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3710
3711        process_command("BEGIN;", &mut db).expect("begin");
3712        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
3713        // Mid-transaction: no save has occurred, so the on-disk
3714        // freelist_head must be unchanged and page_count must not have
3715        // shifted from a sneaky compact.
3716        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
3717        assert_eq!(
3718            pages_mid, pages_before,
3719            "auto-VACUUM must not fire mid-transaction"
3720        );
3721
3722        process_command("ROLLBACK;", &mut db).expect("rollback");
3723        cleanup(&path);
3724    }
3725
3726    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3727    /// trigger even if the ratio would otherwise qualify — the cost of
3728    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3729    #[test]
3730    fn auto_vacuum_skips_under_min_pages_floor() {
3731        let path = tmp_path("av_under_floor");
3732        let mut db = seed_db(); // small: just users + notes, ~5 pages
3733        db.source_path = Some(path.clone());
3734        save_database(&mut db, &path).expect("save");
3735        // Confirm we're below the floor so the test is meaningful.
3736        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3737        assert!(
3738            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
3739            "test setup is too large: floor would not apply (got {pages_before} pages, \
3740             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3741        );
3742
3743        process_command("DROP TABLE users;", &mut db).expect("drop");
3744
3745        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3746        assert_eq!(
3747            pages_after, pages_before,
3748            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3749        );
3750        assert!(
3751            db.pager.as_ref().unwrap().header().freelist_head != 0,
3752            "drop must still populate the freelist normally"
3753        );
3754
3755        cleanup(&path);
3756    }
3757
3758    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3759    /// rather than silently saturating.
3760    #[test]
3761    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3762        let mut db = Database::new("t".to_string());
3763        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3764            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
3765            assert!(
3766                format!("{err}").contains("auto_vacuum_threshold"),
3767                "expected a typed range error for {bad}, got: {err}"
3768            );
3769        }
3770        // The default survives the rejected sets unchanged.
3771        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3772        // And valid values land.
3773        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
3774        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
3775        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
3776        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
3777        db.set_auto_vacuum_threshold(None).unwrap();
3778        assert_eq!(db.auto_vacuum_threshold(), None);
3779    }
3780
3781    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
3782    /// with NotImplemented — only bare `VACUUM;` is supported.
3783    #[test]
3784    fn vacuum_modifiers_are_rejected() {
3785        let path = tmp_path("vacuum_modifiers");
3786        let mut db = seed_db();
3787        db.source_path = Some(path.clone());
3788        save_database(&mut db, &path).expect("save");
3789        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
3790            let err = process_command(stmt, &mut db).unwrap_err();
3791            assert!(
3792                format!("{err}").contains("VACUUM modifiers"),
3793                "expected modifier rejection for `{stmt}`, got: {err}"
3794            );
3795        }
3796        cleanup(&path);
3797    }
3798}