Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod allocator;
34#[allow(dead_code)]
35pub mod cell;
36pub mod file;
37#[allow(dead_code)]
38pub mod freelist;
39#[allow(dead_code)]
40pub mod fts_cell;
41pub mod header;
42#[allow(dead_code)]
43pub mod hnsw_cell;
44#[allow(dead_code)]
45pub mod index_cell;
46#[allow(dead_code)]
47pub mod interior_page;
48pub mod overflow;
49pub mod page;
50pub mod pager;
51#[allow(dead_code)]
52pub mod table_page;
53#[allow(dead_code)]
54pub mod varint;
55#[allow(dead_code)]
56pub mod wal;
57
58use std::collections::{BTreeMap, HashMap};
59use std::path::Path;
60use std::sync::{Arc, Mutex};
61
62use sqlparser::dialect::SQLiteDialect;
63use sqlparser::parser::Parser;
64
65use crate::error::{Result, SQLRiteError};
66use crate::sql::db::database::Database;
67use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69use crate::sql::pager::cell::Cell;
70use crate::sql::pager::header::DbHeader;
71use crate::sql::pager::index_cell::IndexCell;
72use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
73use crate::sql::pager::overflow::{
74    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
75};
76use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
77use crate::sql::pager::pager::Pager;
78use crate::sql::pager::table_page::TablePage;
79use crate::sql::parser::create::CreateQuery;
80
81// Re-export so callers can spell `sql::pager::AccessMode` without
82// reaching into the `pager::pager::pager` submodule path.
83pub use crate::sql::pager::pager::AccessMode;
84
85/// Name of the internal catalog table. Reserved — user CREATEs of this
86/// name must be rejected upstream.
87pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
88
89/// Opens a database file in read-write mode. Shorthand for
90/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
91pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
92    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
93}
94
95/// Opens a database file in read-only mode. Acquires a shared OS-level
96/// advisory lock, so other read-only openers coexist but any writer is
97/// excluded. Attempts to mutate the returned `Database` (e.g. an
98/// `INSERT`, or a `save_database` call against it) bottom out in a
99/// `cannot commit: database is opened read-only` error from the Pager.
100pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
101    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
102}
103
104/// Opens a database file and reconstructs the in-memory `Database`,
105/// leaving the long-lived `Pager` attached for subsequent auto-save
106/// (read-write) or consistent-snapshot reads (read-only).
107pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
108    let pager = Pager::open_with_mode(path, mode)?;
109
110    // 1. Load sqlrite_master from the tree at header.schema_root_page.
111    let mut master = build_empty_master_table();
112    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
113
114    // 2. Two passes over master rows: first build every user table, then
115    //    attach secondary indexes. Indexes need their base table to exist
116    //    before we can populate them. Auto-indexes are created at table
117    //    build time so we only have to load explicit indexes from disk
118    //    (but we also reload the auto-index CONTENT because Table::new
119    //    built it empty).
120    let mut db = Database::new(db_name);
121    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
122
123    for rowid in master.rowids() {
124        let ty = take_text(&master, "type", rowid)?;
125        let name = take_text(&master, "name", rowid)?;
126        let sql = take_text(&master, "sql", rowid)?;
127        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
128        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
129
130        match ty.as_str() {
131            "table" => {
132                let (parsed_name, columns) = parse_create_sql(&sql)?;
133                if parsed_name != name {
134                    return Err(SQLRiteError::Internal(format!(
135                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
136                    )));
137                }
138                let mut table = build_empty_table(&name, columns, last_rowid);
139                if rootpage != 0 {
140                    load_table_rows(&pager, &mut table, rootpage)?;
141                }
142                if last_rowid > table.last_rowid {
143                    table.last_rowid = last_rowid;
144                }
145                db.tables.insert(name, table);
146            }
147            "index" => {
148                index_rows.push(IndexCatalogRow {
149                    name,
150                    sql,
151                    rootpage,
152                });
153            }
154            other => {
155                return Err(SQLRiteError::Internal(format!(
156                    "sqlrite_master row '{name}' has unknown type '{other}'"
157                )));
158            }
159        }
160    }
161
162    // Second pass: attach each index to its table. HNSW indexes
163    // (Phase 7d.2) take a different code path because their persisted
164    // form is just the CREATE INDEX SQL — the graph itself isn't
165    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
166    // and route to a graph-rebuild instead of the B-Tree-cell load.
167    //
168    // Phase 8b — same shape for FTS indexes. The posting lists aren't
169    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
170    // open and let `execute_create_index` walk current rows.
171    for row in index_rows {
172        if create_index_sql_uses_hnsw(&row.sql) {
173            rebuild_hnsw_index(&mut db, &pager, &row)?;
174        } else if create_index_sql_uses_fts(&row.sql) {
175            rebuild_fts_index(&mut db, &pager, &row)?;
176        } else {
177            attach_index(&mut db, &pager, row)?;
178        }
179    }
180
181    db.source_path = Some(path.to_path_buf());
182    db.pager = Some(pager);
183    Ok(db)
184}
185
186/// Catalog row for a secondary index — deferred until after every table is
187/// loaded so the index's base table exists by the time we populate it.
188struct IndexCatalogRow {
189    name: String,
190    sql: String,
191    rootpage: u32,
192}
193
194/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
195/// haven't changed; the [`PageAllocator`] preserves per-table page
196/// numbers across saves so unchanged tables produce zero dirty frames.
197///
198/// Pages that were live before this save but aren't restaged this round
199/// (e.g., the leaves of a dropped table) move onto a persisted free
200/// list rooted at `header.freelist_head`; subsequent saves draw from
201/// the freelist before extending the file. `VACUUM` (see
202/// [`vacuum_database`]) compacts the file by ignoring the freelist and
203/// allocating linearly from page 1.
204///
205/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
206pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
207    save_database_with_mode(db, path, /*compact=*/ false)
208}
209
210/// Reclaims space by rewriting every live B-Tree contiguously from
211/// page 1, with no freelist. Equivalent to `save_database` but ignores
212/// the existing freelist and per-table preferred pools — every page is
213/// allocated by extending the high-water mark — so the resulting file
214/// is tightly packed and the freelist is empty.
215///
216/// Used by the SQL-level `VACUUM;` statement.
217pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
218    save_database_with_mode(db, path, /*compact=*/ true)
219}
220
221/// Shared save core. `compact = false` is the normal save path (uses
222/// the existing freelist + per-table preferred pools). `compact = true`
223/// is the VACUUM path (empty freelist, empty preferred pools, linear
224/// allocation from page 1).
225fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
226    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
227    // marked dirty. Done up front under the &mut Database borrow we
228    // already hold, before the immutable iteration loops below need
229    // their own borrow.
230    rebuild_dirty_hnsw_indexes(db);
231    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
232    rebuild_dirty_fts_indexes(db);
233
234    let same_path = db.source_path.as_deref() == Some(path);
235    let mut pager = if same_path {
236        match db.pager.take() {
237            Some(p) => p,
238            None if path.exists() => Pager::open(path)?,
239            None => Pager::create(path)?,
240        }
241    } else if path.exists() {
242        Pager::open(path)?
243    } else {
244        Pager::create(path)?
245    };
246
247    // Snapshot what was live BEFORE we reset staged. Used to compute the
248    // newly-freed set after staging completes. Page 0 (the header) is
249    // never on the freelist — it's always live.
250    let old_header = pager.header();
251    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
252
253    // Read the previously-persisted freelist so its leaf pages can be
254    // reused as preferred allocations and its trunk pages don't leak.
255    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
256        (Vec::new(), Vec::new())
257    } else {
258        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
259    };
260
261    // Snapshot the previous rootpages of each table/index so we can
262    // seed per-table preferred pools (the unchanged-table case stages
263    // byte-identical pages → diff pager skips every write for it).
264    let old_rootpages = if compact {
265        HashMap::new()
266    } else {
267        read_old_rootpages(&pager, old_header.schema_root_page)?
268    };
269
270    pager.clear_staged();
271
272    // Allocator: in normal mode, seed with the old freelist; in compact
273    // mode, start empty so allocation extends linearly from page 1.
274    use std::collections::VecDeque;
275    let initial_freelist: VecDeque<u32> = if compact {
276        VecDeque::new()
277    } else {
278        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
279    };
280    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
281
282    // 1. Stage each user table's B-Tree, collecting master-row info.
283    //    `kind` is "table" or "index" — master has one row per each.
284    let mut master_rows: Vec<CatalogEntry> = Vec::new();
285
286    let mut table_names: Vec<&String> = db.tables.keys().collect();
287    table_names.sort();
288    for name in table_names {
289        if name == MASTER_TABLE_NAME {
290            return Err(SQLRiteError::Internal(format!(
291                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
292            )));
293        }
294        if !compact {
295            if let Some(&prev_root) = old_rootpages.get(&("table".to_string(), name.to_string())) {
296                let prev =
297                    collect_pages_for_btree(&pager, prev_root, /*follow_overflow=*/ true)?;
298                alloc.set_preferred(prev);
299            }
300        }
301        let table = &db.tables[name];
302        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
303        alloc.finish_preferred();
304        master_rows.push(CatalogEntry {
305            kind: "table".into(),
306            name: name.clone(),
307            sql: table_to_create_sql(table),
308            rootpage,
309            last_rowid: table.last_rowid,
310        });
311    }
312
313    // 2. Stage each secondary index's B-Tree. Indexes persist in a
314    //    deterministic order: sorted by (owning_table, index_name).
315    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
316    for table in db.tables.values() {
317        for idx in &table.secondary_indexes {
318            index_entries.push((table, idx));
319        }
320    }
321    index_entries
322        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
323    for (_table, idx) in index_entries {
324        if !compact {
325            if let Some(&prev_root) =
326                old_rootpages.get(&("index".to_string(), idx.name.to_string()))
327            {
328                let prev =
329                    collect_pages_for_btree(&pager, prev_root, /*follow_overflow=*/ false)?;
330                alloc.set_preferred(prev);
331            }
332        }
333        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
334        alloc.finish_preferred();
335        master_rows.push(CatalogEntry {
336            kind: "index".into(),
337            name: idx.name.clone(),
338            sql: idx.synthesized_sql(),
339            rootpage,
340            last_rowid: 0,
341        });
342    }
343
344    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
345    //     page trees, with the rootpage recorded in sqlrite_master.
346    //     Reopen loads the graph back from cells (fast, exact match)
347    //     instead of rebuilding from rows.
348    //
349    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
350    //     rebuilt from current rows BEFORE staging, so the on-disk
351    //     graph reflects the current row set.
352    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
353    for table in db.tables.values() {
354        for entry in &table.hnsw_indexes {
355            hnsw_entries.push((table, entry));
356        }
357    }
358    hnsw_entries
359        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
360    for (table, entry) in hnsw_entries {
361        if !compact {
362            if let Some(&prev_root) =
363                old_rootpages.get(&("index".to_string(), entry.name.to_string()))
364            {
365                let prev =
366                    collect_pages_for_btree(&pager, prev_root, /*follow_overflow=*/ false)?;
367                alloc.set_preferred(prev);
368            }
369        }
370        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
371        alloc.finish_preferred();
372        master_rows.push(CatalogEntry {
373            kind: "index".into(),
374            name: entry.name.clone(),
375            sql: format!(
376                "CREATE INDEX {} ON {} USING hnsw ({})",
377                entry.name, table.tb_name, entry.column_name
378            ),
379            rootpage,
380            last_rowid: 0,
381        });
382    }
383
384    // 2c. Phase 8c — persist FTS posting lists as their own
385    //     cell-encoded page trees, with the rootpage recorded in
386    //     sqlrite_master. Reopen loads the postings back from cells
387    //     (fast, exact match) instead of re-tokenizing rows.
388    //
389    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
390    //     rebuilt from current rows BEFORE staging by
391    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
392    //     the current row set.
393    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
394    for table in db.tables.values() {
395        for entry in &table.fts_indexes {
396            fts_entries.push((table, entry));
397        }
398    }
399    fts_entries
400        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
401    let any_fts = !fts_entries.is_empty();
402    for (table, entry) in fts_entries {
403        if !compact {
404            if let Some(&prev_root) =
405                old_rootpages.get(&("index".to_string(), entry.name.to_string()))
406            {
407                let prev =
408                    collect_pages_for_btree(&pager, prev_root, /*follow_overflow=*/ false)?;
409                alloc.set_preferred(prev);
410            }
411        }
412        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
413        alloc.finish_preferred();
414        master_rows.push(CatalogEntry {
415            kind: "index".into(),
416            name: entry.name.clone(),
417            sql: format!(
418                "CREATE INDEX {} ON {} USING fts ({})",
419                entry.name, table.tb_name, entry.column_name
420            ),
421            rootpage,
422            last_rowid: 0,
423        });
424    }
425
426    // 3. Build an in-memory sqlrite_master with one row per table or index,
427    //    then stage it via the same tree-build path. Seed master's
428    //    preferred pool with the previous master tree's pages so the
429    //    catalog page numbers stay stable across saves whenever the
430    //    catalog content didn't change.
431    let mut master = build_empty_master_table();
432    for (i, entry) in master_rows.into_iter().enumerate() {
433        let rowid = (i as i64) + 1;
434        master.restore_row(
435            rowid,
436            vec![
437                Some(Value::Text(entry.kind)),
438                Some(Value::Text(entry.name)),
439                Some(Value::Text(entry.sql)),
440                Some(Value::Integer(entry.rootpage as i64)),
441                Some(Value::Integer(entry.last_rowid)),
442            ],
443        )?;
444    }
445    if !compact && old_header.schema_root_page != 0 {
446        let prev = collect_pages_for_btree(
447            &pager,
448            old_header.schema_root_page,
449            /*follow_overflow=*/ true,
450        )?;
451        alloc.set_preferred(prev);
452    }
453    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
454    alloc.finish_preferred();
455
456    // 4. Compute newly-freed pages: the previously-live set minus what
457    //    we just restaged. The previous freelist's trunk pages get
458    //    re-encoded too — they're in `old_live`, weren't restaged, so
459    //    the filter naturally moves them to the new freelist.
460    //
461    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
462    // routing it onto the new freelist. The whole point of VACUUM is
463    // to let the file truncate to the new high-water mark, so any page
464    // past it gets dropped at the next checkpoint.
465    if !compact {
466        let used = alloc.used().clone();
467        let mut newly_freed: Vec<u32> = old_live
468            .iter()
469            .copied()
470            .filter(|p| !used.contains(p))
471            .collect();
472        let _ = &old_free_trunks; // silenced — handled by the old_live filter
473        alloc.add_to_freelist(newly_freed.drain(..));
474    }
475
476    // 5. Encode the new freelist into trunk pages. `stage_freelist`
477    //    consumes some of the free pages AS the trunk pages themselves —
478    //    a trunk is just a free page borrowed for metadata. Pages that
479    //    were on the freelist but become trunks no longer need to be
480    //    "extension" pages; the high-water mark from the staging loop
481    //    above is already correct.
482    let new_free_pages = alloc.drain_freelist();
483    let new_freelist_head =
484        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
485
486    // 6. Pick the format version. v6 is on demand: only bumps when the
487    //    new freelist is non-empty. FTS-bearing files keep their v5
488    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
489    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
490    let format_version = if new_freelist_head != 0 {
491        FORMAT_VERSION_V6
492    } else if any_fts {
493        // Preserve a v6 file at v6 (don't downgrade) but otherwise
494        // bump v4 → v5 for FTS like Phase 8c does.
495        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
496    } else {
497        // Preserve whatever the file already was.
498        old_header.format_version
499    };
500
501    pager.commit(DbHeader {
502        page_count: alloc.high_water(),
503        schema_root_page: master_root,
504        format_version,
505        freelist_head: new_freelist_head,
506    })?;
507
508    if same_path {
509        db.pager = Some(pager);
510    }
511    Ok(())
512}
513
514/// Build material for a single row in sqlrite_master.
515struct CatalogEntry {
516    kind: String, // "table" or "index"
517    name: String,
518    sql: String,
519    rootpage: u32,
520    last_rowid: i64,
521}
522
523// -------------------------------------------------------------------------
524// sqlrite_master — hardcoded catalog table schema
525
526fn build_empty_master_table() -> Table {
527    // Phase 3e: `type` is the first column, matching SQLite's convention.
528    // It distinguishes `'table'` rows from `'index'` rows.
529    let columns = vec![
530        Column::new("type".into(), "text".into(), false, true, false),
531        Column::new("name".into(), "text".into(), true, true, true),
532        Column::new("sql".into(), "text".into(), false, true, false),
533        Column::new("rootpage".into(), "integer".into(), false, true, false),
534        Column::new("last_rowid".into(), "integer".into(), false, true, false),
535    ];
536    build_empty_table(MASTER_TABLE_NAME, columns, 0)
537}
538
539/// Reads a required Text column from a known-good catalog row.
540fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
541    match table.get_value(col, rowid) {
542        Some(Value::Text(s)) => Ok(s),
543        other => Err(SQLRiteError::Internal(format!(
544            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
545        ))),
546    }
547}
548
549/// Reads a required Integer column from a known-good catalog row.
550fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
551    match table.get_value(col, rowid) {
552        Some(Value::Integer(v)) => Ok(v),
553        other => Err(SQLRiteError::Internal(format!(
554            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
555        ))),
556    }
557}
558
559// -------------------------------------------------------------------------
560// CREATE-TABLE SQL synthesis and re-parsing
561
562/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
563/// Deterministic: same schema → same SQL, so diffing commits stay stable.
564fn table_to_create_sql(table: &Table) -> String {
565    let mut parts = Vec::with_capacity(table.columns.len());
566    for c in &table.columns {
567        // Render the SQL type literally so the round-trip through
568        // CREATE TABLE re-parsing recreates the same schema. Vector
569        // carries its dimension inline.
570        let ty: String = match &c.datatype {
571            DataType::Integer => "INTEGER".to_string(),
572            DataType::Text => "TEXT".to_string(),
573            DataType::Real => "REAL".to_string(),
574            DataType::Bool => "BOOLEAN".to_string(),
575            DataType::Vector(dim) => format!("VECTOR({dim})"),
576            DataType::Json => "JSON".to_string(),
577            DataType::None | DataType::Invalid => "TEXT".to_string(),
578        };
579        let mut piece = format!("{} {}", c.column_name, ty);
580        if c.is_pk {
581            piece.push_str(" PRIMARY KEY");
582        } else {
583            if c.is_unique {
584                piece.push_str(" UNIQUE");
585            }
586            if c.not_null {
587                piece.push_str(" NOT NULL");
588            }
589        }
590        if let Some(default) = &c.default {
591            piece.push_str(" DEFAULT ");
592            piece.push_str(&render_default_literal(default));
593        }
594        parts.push(piece);
595    }
596    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
597}
598
599/// Renders a DEFAULT value back to SQL-literal form so the synthesized
600/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
601/// single-quoted with single-quote doubling for escaping. Vector defaults
602/// are not currently expressible at CREATE TABLE time, so we render them
603/// as their bracket-array form (matches the INSERT literal grammar).
604fn render_default_literal(value: &Value) -> String {
605    match value {
606        Value::Integer(i) => i.to_string(),
607        Value::Real(f) => f.to_string(),
608        Value::Bool(b) => {
609            if *b {
610                "TRUE".to_string()
611            } else {
612                "FALSE".to_string()
613            }
614        }
615        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
616        Value::Null => "NULL".to_string(),
617        Value::Vector(_) => value.to_display_string(),
618    }
619}
620
621/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
622/// and produces our internal column list. Returns `(table_name, columns)`.
623fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
624    let dialect = SQLiteDialect {};
625    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
626    let stmt = ast.pop().ok_or_else(|| {
627        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
628    })?;
629    let create = CreateQuery::new(&stmt)?;
630    let columns = create
631        .columns
632        .into_iter()
633        .map(|pc| {
634            Column::with_default(
635                pc.name,
636                pc.datatype,
637                pc.is_pk,
638                pc.not_null,
639                pc.is_unique,
640                pc.default,
641            )
642        })
643        .collect();
644    Ok((create.table_name, columns))
645}
646
647// -------------------------------------------------------------------------
648// In-memory table (re)construction
649
650/// Builds an empty in-memory `Table` given the declared columns.
651fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
652    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
653    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
654    {
655        let mut map = rows.lock().expect("rows mutex poisoned");
656        for col in &columns {
657            // Mirror the dispatch in `Table::new` so the reconstructed
658            // table has the same shape it'd have if it were built fresh
659            // from SQL. Phase 7a adds the Vector arm — without it,
660            // VECTOR columns silently restore as Row::None and every
661            // restore_row hits a "storage None vs value Some(Vector(...))"
662            // type mismatch.
663            let row = match &col.datatype {
664                DataType::Integer => Row::Integer(BTreeMap::new()),
665                DataType::Text => Row::Text(BTreeMap::new()),
666                DataType::Real => Row::Real(BTreeMap::new()),
667                DataType::Bool => Row::Bool(BTreeMap::new()),
668                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
669                // JSON columns reuse Text storage — see Table::new and
670                // Phase 7e's scope-correction note.
671                DataType::Json => Row::Text(BTreeMap::new()),
672                DataType::None | DataType::Invalid => Row::None,
673            };
674            map.insert(col.column_name.clone(), row);
675
676            // Auto-create UNIQUE/PK indexes so the restored table has the
677            // same shape Table::new would have built from fresh SQL.
678            if (col.is_pk || col.is_unique)
679                && matches!(col.datatype, DataType::Integer | DataType::Text)
680            {
681                if let Ok(idx) = SecondaryIndex::new(
682                    SecondaryIndex::auto_name(name, &col.column_name),
683                    name.to_string(),
684                    col.column_name.clone(),
685                    &col.datatype,
686                    true,
687                    IndexOrigin::Auto,
688                ) {
689                    secondary_indexes.push(idx);
690                }
691            }
692        }
693    }
694
695    let primary_key = columns
696        .iter()
697        .find(|c| c.is_pk)
698        .map(|c| c.column_name.clone())
699        .unwrap_or_else(|| "-1".to_string());
700
701    Table {
702        tb_name: name.to_string(),
703        columns,
704        rows,
705        secondary_indexes,
706        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
707        // executing each `CREATE INDEX … USING hnsw` SQL stored in
708        // `sqlrite_master`. This builder produces the empty shell;
709        // `replay_create_index_for_hnsw` (in this same module) walks
710        // sqlrite_master after every table is loaded and rebuilds the
711        // graph from current row data. Persistence of the graph itself
712        // (avoiding the on-open rebuild cost) is Phase 7d.3.
713        hnsw_indexes: Vec::new(),
714        // FTS indexes (Phase 8b) follow the same pattern — the
715        // CREATE INDEX … USING fts SQL is the source of truth on open
716        // and the in-memory posting list gets rebuilt from current
717        // rows. Cell-encoded persistence of the postings is Phase 8c.
718        fts_indexes: Vec::new(),
719        last_rowid,
720        primary_key,
721    }
722}
723
724// -------------------------------------------------------------------------
725// Leaf-chain read / write
726
727/// Walks a table's B-Tree from `root_page`, following the leftmost-child
728/// chain down to the first leaf, then iterating leaves via their sibling
729/// `next_page` pointers. Every cell is decoded and replayed into `table`.
730///
731/// Open-path note: we eagerly materialize the entire table into `Table`'s
732/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
733/// on demand so queries can stream through the tree without a full upfront
734/// load.
735/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
736/// index on its base table by walking the tree of index cells at
737/// `rootpage`. The base table is expected to already be in `db.tables`.
738fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
739    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
740
741    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
742        SQLRiteError::Internal(format!(
743            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
744            row.name
745        ))
746    })?;
747    let datatype = table
748        .columns
749        .iter()
750        .find(|c| c.column_name == column_name)
751        .map(|c| clone_datatype(&c.datatype))
752        .ok_or_else(|| {
753            SQLRiteError::Internal(format!(
754                "index '{}' references unknown column '{column_name}' on '{table_name}'",
755                row.name
756            ))
757        })?;
758
759    // An auto-index on this column may already exist (built by
760    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
761    // the slot instead of adding a duplicate entry.
762    let existing_slot = table
763        .secondary_indexes
764        .iter()
765        .position(|i| i.name == row.name);
766    let idx = match existing_slot {
767        Some(i) => {
768            // Drain any entries that may have been populated during table
769            // restore_row calls — we're about to repopulate from the
770            // persisted tree.
771            table.secondary_indexes.remove(i)
772        }
773        None => SecondaryIndex::new(
774            row.name.clone(),
775            table_name.clone(),
776            column_name.clone(),
777            &datatype,
778            is_unique,
779            IndexOrigin::Explicit,
780        )?,
781    };
782    let mut idx = idx;
783    // Wipe any stale entries from the auto path so the load is idempotent.
784    let is_unique_flag = idx.is_unique;
785    let origin = idx.origin;
786    idx = SecondaryIndex::new(
787        idx.name,
788        idx.table_name,
789        idx.column_name,
790        &datatype,
791        is_unique_flag,
792        origin,
793    )?;
794
795    // Populate from the index tree's cells.
796    load_index_rows(pager, &mut idx, row.rootpage)?;
797
798    table.secondary_indexes.push(idx);
799    Ok(())
800}
801
802/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
803/// every `(value, rowid)` pair into `idx`.
804fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
805    if root_page == 0 {
806        return Ok(());
807    }
808    let first_leaf = find_leftmost_leaf(pager, root_page)?;
809    let mut current = first_leaf;
810    while current != 0 {
811        let page_buf = pager
812            .read_page(current)
813            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
814        if page_buf[0] != PageType::TableLeaf as u8 {
815            return Err(SQLRiteError::Internal(format!(
816                "page {current} tagged {} but expected TableLeaf (index)",
817                page_buf[0]
818            )));
819        }
820        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
821        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
822            .try_into()
823            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
824        let leaf = TablePage::from_bytes(payload);
825
826        for slot in 0..leaf.slot_count() {
827            // Slots on an index page hold KIND_INDEX cells; decode directly.
828            let offset = leaf.slot_offset_raw(slot)?;
829            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
830            idx.insert(&ic.value, ic.rowid)?;
831        }
832        current = next_leaf;
833    }
834    Ok(())
835}
836
837/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
838/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
839///
840/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
841/// still works; the only shape we accept is single-column indexes.
842fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
843    use sqlparser::ast::{CreateIndex, Expr, Statement};
844
845    let dialect = SQLiteDialect {};
846    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
847    let Some(Statement::CreateIndex(CreateIndex {
848        table_name,
849        columns,
850        unique,
851        ..
852    })) = ast.pop()
853    else {
854        return Err(SQLRiteError::Internal(format!(
855            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
856        )));
857    };
858    if columns.len() != 1 {
859        return Err(SQLRiteError::NotImplemented(
860            "multi-column indexes aren't supported yet".to_string(),
861        ));
862    }
863    let col = match &columns[0].column.expr {
864        Expr::Identifier(ident) => ident.value.clone(),
865        Expr::CompoundIdentifier(parts) => {
866            parts.last().map(|p| p.value.clone()).unwrap_or_default()
867        }
868        other => {
869            return Err(SQLRiteError::Internal(format!(
870                "unsupported indexed column expression: {other:?}"
871            )));
872        }
873    };
874    Ok((table_name.to_string(), col, unique))
875}
876
877/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
878/// Used by the open path to route HNSW indexes to the graph-rebuild path
879/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
880/// don't have a USING clause, so they all return false and continue
881/// taking the existing path.
882fn create_index_sql_uses_hnsw(sql: &str) -> bool {
883    use sqlparser::ast::{CreateIndex, IndexType, Statement};
884
885    let dialect = SQLiteDialect {};
886    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
887        return false;
888    };
889    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
890        return false;
891    };
892    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
893}
894
895/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
896/// Mirrors [`create_index_sql_uses_hnsw`].
897fn create_index_sql_uses_fts(sql: &str) -> bool {
898    use sqlparser::ast::{CreateIndex, IndexType, Statement};
899
900    let dialect = SQLiteDialect {};
901    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
902        return false;
903    };
904    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
905        return false;
906    };
907    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
908}
909
910/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
911/// paths mirror [`rebuild_hnsw_index`]:
912///
913///   - **rootpage != 0** (Phase 8c default): the posting list is
914///     persisted as cell-encoded pages. Read every cell directly via
915///     [`load_fts_postings`] and reconstruct the index — no
916///     re-tokenization, exact bit-for-bit reproduction.
917///
918///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
919///     for files saved by Phase 8b before persistence landed. Replay
920///     the CREATE INDEX SQL through `execute_create_index`, which
921///     walks the table's current rows and tokenizes them fresh.
922fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
923    use crate::sql::db::table::FtsIndexEntry;
924    use crate::sql::executor::execute_create_index;
925    use crate::sql::fts::PostingList;
926    use sqlparser::ast::Statement;
927
928    let dialect = SQLiteDialect {};
929    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
930    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
931        return Err(SQLRiteError::Internal(format!(
932            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
933            row.sql
934        )));
935    };
936
937    if row.rootpage == 0 {
938        // Compatibility path — no persisted postings; replay rows.
939        execute_create_index(&stmt, db)?;
940        return Ok(());
941    }
942
943    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
944    let index = PostingList::from_persisted_postings(doc_lengths, postings);
945    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
946    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
947        SQLRiteError::Internal(format!(
948            "FTS index '{}' references unknown table '{tbl_name}'",
949            row.name
950        ))
951    })?;
952    table_mut.fts_indexes.push(FtsIndexEntry {
953        name: row.name.clone(),
954        column_name: col_name,
955        index,
956        needs_rebuild: false,
957    });
958    Ok(())
959}
960
961/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
962/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
963fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
964    use sqlparser::ast::{CreateIndex, Expr, Statement};
965
966    let dialect = SQLiteDialect {};
967    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
968    let Some(Statement::CreateIndex(CreateIndex {
969        table_name,
970        columns,
971        ..
972    })) = ast.pop()
973    else {
974        return Err(SQLRiteError::Internal(format!(
975            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
976        )));
977    };
978    if columns.len() != 1 {
979        return Err(SQLRiteError::NotImplemented(
980            "multi-column FTS indexes aren't supported yet".to_string(),
981        ));
982    }
983    let col = match &columns[0].column.expr {
984        Expr::Identifier(ident) => ident.value.clone(),
985        Expr::CompoundIdentifier(parts) => {
986            parts.last().map(|p| p.value.clone()).unwrap_or_default()
987        }
988        other => {
989            return Err(SQLRiteError::Internal(format!(
990                "FTS CREATE INDEX has unexpected column expr: {other:?}"
991            )));
992        }
993    };
994    Ok((table_name.to_string(), col))
995}
996
997/// Loads (or rebuilds) an HNSW index on database open. Two paths:
998///
999///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1000///     as cell-encoded pages. Read every node directly via
1001///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1002///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1003///
1004///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1005///     files saved by Phase 7d.2 before persistence landed. Replay the
1006///     CREATE INDEX SQL through `execute_create_index`, which walks the
1007///     table's current rows and populates a fresh graph. Slower but
1008///     correctness-equivalent on the first save with the new code.
1009fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1010    use crate::sql::db::table::HnswIndexEntry;
1011    use crate::sql::executor::execute_create_index;
1012    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1013    use sqlparser::ast::Statement;
1014
1015    let dialect = SQLiteDialect {};
1016    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1017    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
1018        return Err(SQLRiteError::Internal(format!(
1019            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1020            row.sql
1021        )));
1022    };
1023
1024    if row.rootpage == 0 {
1025        // Compatibility path — no persisted graph; walk current rows.
1026        execute_create_index(&stmt, db)?;
1027        return Ok(());
1028    }
1029
1030    // Persistence path — read the cell tree, deserialize.
1031    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
1032    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
1033
1034    // Parse the CREATE INDEX to know which table + column to attach to
1035    // — same shape as the row-walk path; we just don't execute it.
1036    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
1037    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
1038        SQLRiteError::Internal(format!(
1039            "HNSW index '{}' references unknown table '{tbl_name}'",
1040            row.name
1041        ))
1042    })?;
1043    table_mut.hnsw_indexes.push(HnswIndexEntry {
1044        name: row.name.clone(),
1045        column_name: col_name,
1046        index,
1047        needs_rebuild: false,
1048    });
1049    Ok(())
1050}
1051
1052/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1053/// page tree at `root_page` and decode each cell as a node. Returns
1054/// the (node_id, layers) tuples in slot-order (already ascending by
1055/// node_id since they were staged that way). The caller hands them to
1056/// `HnswIndex::from_persisted_nodes`.
1057fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1058    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1059
1060    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1061    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1062    let mut current = first_leaf;
1063    while current != 0 {
1064        let page_buf = pager
1065            .read_page(current)
1066            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1067        if page_buf[0] != PageType::TableLeaf as u8 {
1068            return Err(SQLRiteError::Internal(format!(
1069                "page {current} tagged {} but expected TableLeaf (HNSW)",
1070                page_buf[0]
1071            )));
1072        }
1073        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1074        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1075            .try_into()
1076            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1077        let leaf = TablePage::from_bytes(payload);
1078        for slot in 0..leaf.slot_count() {
1079            let offset = leaf.slot_offset_raw(slot)?;
1080            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1081            nodes.push((cell.node_id, cell.layers));
1082        }
1083        current = next_leaf;
1084    }
1085    Ok(nodes)
1086}
1087
1088/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
1089/// SQL string. Used by the persistence path on open to know where to
1090/// attach the loaded graph. Same shape as `parse_create_index_sql` for
1091/// regular indexes — only the assertion differs (we don't care about
1092/// UNIQUE for HNSW).
1093fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
1094    use sqlparser::ast::{CreateIndex, Expr, Statement};
1095
1096    let dialect = SQLiteDialect {};
1097    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1098    let Some(Statement::CreateIndex(CreateIndex {
1099        table_name,
1100        columns,
1101        ..
1102    })) = ast.pop()
1103    else {
1104        return Err(SQLRiteError::Internal(format!(
1105            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1106        )));
1107    };
1108    if columns.len() != 1 {
1109        return Err(SQLRiteError::NotImplemented(
1110            "multi-column HNSW indexes aren't supported yet".to_string(),
1111        ));
1112    }
1113    let col = match &columns[0].column.expr {
1114        Expr::Identifier(ident) => ident.value.clone(),
1115        Expr::CompoundIdentifier(parts) => {
1116            parts.last().map(|p| p.value.clone()).unwrap_or_default()
1117        }
1118        other => {
1119            return Err(SQLRiteError::Internal(format!(
1120                "unsupported HNSW indexed column expression: {other:?}"
1121            )));
1122        }
1123    };
1124    Ok((table_name.to_string(), col))
1125}
1126
1127/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1128/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1129/// Walks the table's current Vec<f32> column storage and runs the
1130/// HNSW algorithm fresh. Called at the top of `save_database` before
1131/// any immutable borrows of `db` start.
1132///
1133/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1134/// small tables, expensive for ≥100k-row tables — matches the
1135/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1136/// MVP, more sophisticated incremental delete strategies (soft-delete
1137/// + tombstones, neighbor reconnection) are future polish.
1138fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
1139    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1140
1141    for table in db.tables.values_mut() {
1142        // Snapshot which (index_name, column) pairs need rebuilding,
1143        // before we go grabbing column data — keeps the borrow
1144        // structure simple.
1145        let dirty: Vec<(String, String)> = table
1146            .hnsw_indexes
1147            .iter()
1148            .filter(|e| e.needs_rebuild)
1149            .map(|e| (e.name.clone(), e.column_name.clone()))
1150            .collect();
1151        if dirty.is_empty() {
1152            continue;
1153        }
1154
1155        for (idx_name, col_name) in dirty {
1156            // Snapshot every (rowid, vec) for this column.
1157            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
1158            {
1159                let row_data = table.rows.lock().expect("rows mutex poisoned");
1160                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
1161                    for (id, v) in map.iter() {
1162                        vectors.push((*id, v.clone()));
1163                    }
1164                }
1165            }
1166            // Pre-build a HashMap for the get_vec closure so we don't
1167            // pay O(N) lookup per insert call.
1168            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
1169                vectors.iter().cloned().collect();
1170
1171            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
1172            // Sort by id so the rebuild is deterministic across runs.
1173            vectors.sort_by_key(|(id, _)| *id);
1174            for (id, v) in &vectors {
1175                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
1176            }
1177
1178            // Replace the entry's index + clear the dirty flag.
1179            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
1180                entry.index = new_idx;
1181                entry.needs_rebuild = false;
1182            }
1183        }
1184    }
1185}
1186
1187/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1188/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1189/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1190/// is empty so the per-table loop short-circuits).
1191fn rebuild_dirty_fts_indexes(db: &mut Database) {
1192    use crate::sql::fts::PostingList;
1193
1194    for table in db.tables.values_mut() {
1195        let dirty: Vec<(String, String)> = table
1196            .fts_indexes
1197            .iter()
1198            .filter(|e| e.needs_rebuild)
1199            .map(|e| (e.name.clone(), e.column_name.clone()))
1200            .collect();
1201        if dirty.is_empty() {
1202            continue;
1203        }
1204
1205        for (idx_name, col_name) in dirty {
1206            // Snapshot every (rowid, text) pair for this column under
1207            // the row mutex, then drop the lock before re-tokenizing.
1208            let mut docs: Vec<(i64, String)> = Vec::new();
1209            {
1210                let row_data = table.rows.lock().expect("rows mutex poisoned");
1211                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1212                    for (id, v) in map.iter() {
1213                        // "Null" sentinel is the parser's
1214                        // null-marker for TEXT cells; skip those —
1215                        // they'd round-trip as the literal string
1216                        // "Null" otherwise. Aligns with insert_row's
1217                        // typed_value gate.
1218                        if v != "Null" {
1219                            docs.push((*id, v.clone()));
1220                        }
1221                    }
1222                }
1223            }
1224
1225            let mut new_idx = PostingList::new();
1226            // Sort by id so the rebuild is deterministic across runs
1227            // (the BTreeMap inside PostingList is order-stable, but
1228            // doc-length aggregation order doesn't matter — sorting
1229            // here is purely for reproducibility on inspection).
1230            docs.sort_by_key(|(id, _)| *id);
1231            for (id, text) in &docs {
1232                new_idx.insert(*id, text);
1233            }
1234
1235            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1236                entry.index = new_idx;
1237                entry.needs_rebuild = false;
1238            }
1239        }
1240    }
1241}
1242
1243/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1244fn clone_datatype(dt: &DataType) -> DataType {
1245    match dt {
1246        DataType::Integer => DataType::Integer,
1247        DataType::Text => DataType::Text,
1248        DataType::Real => DataType::Real,
1249        DataType::Bool => DataType::Bool,
1250        DataType::Vector(dim) => DataType::Vector(*dim),
1251        DataType::Json => DataType::Json,
1252        DataType::None => DataType::None,
1253        DataType::Invalid => DataType::Invalid,
1254    }
1255}
1256
1257/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1258/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1259/// `(root_page, next_free_page)`.
1260///
1261/// The tree's shape matches a regular table's — leaves chained via
1262/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1263/// uniformly for index cells (same prefix as local cells), so the
1264/// existing slot directory and binary search carry over.
1265fn stage_index_btree(
1266    pager: &mut Pager,
1267    idx: &SecondaryIndex,
1268    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1269) -> Result<u32> {
1270    // Build the leaves.
1271    let leaves = stage_index_leaves(pager, idx, alloc)?;
1272    if leaves.len() == 1 {
1273        return Ok(leaves[0].0);
1274    }
1275    let mut level: Vec<(u32, i64)> = leaves;
1276    while level.len() > 1 {
1277        level = stage_interior_level(pager, &level, alloc)?;
1278    }
1279    Ok(level[0].0)
1280}
1281
1282/// Packs the index's (value, rowid) entries into a sibling-chained run
1283/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1284/// (ascending value; rowids in insertion order within a value), which is
1285/// also ascending by the "cell rowid" carried in each IndexCell (the
1286/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1287/// rowid ordering stays consistent.
1288fn stage_index_leaves(
1289    pager: &mut Pager,
1290    idx: &SecondaryIndex,
1291    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1292) -> Result<Vec<(u32, i64)>> {
1293    let mut leaves: Vec<(u32, i64)> = Vec::new();
1294    let mut current_leaf = TablePage::empty();
1295    let mut current_leaf_page = alloc.allocate();
1296    let mut current_max_rowid: Option<i64> = None;
1297
1298    // Sort the entries by original rowid so the in-page slot directory,
1299    // which binary-searches by rowid, stays valid. (iter_entries orders by
1300    // value; we reorder here for B-Tree correctness.)
1301    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1302    entries.sort_by_key(|(_, r)| *r);
1303
1304    for (value, rowid) in entries {
1305        let cell = IndexCell::new(rowid, value);
1306        let entry_bytes = cell.encode()?;
1307
1308        if !current_leaf.would_fit(entry_bytes.len()) {
1309            let next_leaf_page_num = alloc.allocate();
1310            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1311            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1312            current_leaf = TablePage::empty();
1313            current_leaf_page = next_leaf_page_num;
1314
1315            if !current_leaf.would_fit(entry_bytes.len()) {
1316                return Err(SQLRiteError::Internal(format!(
1317                    "index entry of {} bytes exceeds empty-page capacity {}",
1318                    entry_bytes.len(),
1319                    current_leaf.free_space()
1320                )));
1321            }
1322        }
1323        current_leaf.insert_entry(rowid, &entry_bytes)?;
1324        current_max_rowid = Some(rowid);
1325    }
1326
1327    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1328    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1329    Ok(leaves)
1330}
1331
1332/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1333/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1334/// (node_id, layers). Returns `(root_page, next_free_page)`.
1335///
1336/// Tree shape is identical to `stage_index_btree` — chained leaves +
1337/// optional interior layers. The slot directory binary-searches by
1338/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1339/// so reads can locate any node in O(log N) once 7d.4-or-later
1340/// optimizes the load path to lazy-fetch instead of read-all.
1341/// Today, `load_hnsw_nodes` reads the entire tree on open.
1342fn stage_hnsw_btree(
1343    pager: &mut Pager,
1344    idx: &crate::sql::hnsw::HnswIndex,
1345    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1346) -> Result<u32> {
1347    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1348    if leaves.len() == 1 {
1349        return Ok(leaves[0].0);
1350    }
1351    let mut level: Vec<(u32, i64)> = leaves;
1352    while level.len() > 1 {
1353        level = stage_interior_level(pager, &level, alloc)?;
1354    }
1355    Ok(level[0].0)
1356}
1357
1358/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1359/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1360/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1361/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1362/// doc-lengths map, then one cell per term in lexicographic order.
1363fn stage_fts_btree(
1364    pager: &mut Pager,
1365    idx: &crate::sql::fts::PostingList,
1366    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1367) -> Result<u32> {
1368    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1369    if leaves.len() == 1 {
1370        return Ok(leaves[0].0);
1371    }
1372    let mut level: Vec<(u32, i64)> = leaves;
1373    while level.len() > 1 {
1374        level = stage_interior_level(pager, &level, alloc)?;
1375    }
1376    Ok(level[0].0)
1377}
1378
1379/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1380/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1381/// followed by one cell per term in lexicographic order with
1382/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1383/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1384/// returns).
1385fn stage_fts_leaves(
1386    pager: &mut Pager,
1387    idx: &crate::sql::fts::PostingList,
1388    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1389) -> Result<Vec<(u32, i64)>> {
1390    use crate::sql::pager::fts_cell::FtsPostingCell;
1391
1392    let mut leaves: Vec<(u32, i64)> = Vec::new();
1393    let mut current_leaf = TablePage::empty();
1394    let mut current_leaf_page = alloc.allocate();
1395    let mut current_max_rowid: Option<i64> = None;
1396
1397    // Build the cell sequence: sidecar first, then per-term cells. The
1398    // sidecar always exists (even on an empty index) so reload sees a
1399    // canonical "this index was persisted" marker in slot 0.
1400    let mut cell_id: i64 = 1;
1401    let mut cells: Vec<FtsPostingCell> = Vec::new();
1402    cells.push(FtsPostingCell::doc_lengths(
1403        cell_id,
1404        idx.serialize_doc_lengths(),
1405    ));
1406    for (term, entries) in idx.serialize_postings() {
1407        cell_id += 1;
1408        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1409    }
1410
1411    for cell in cells {
1412        let entry_bytes = cell.encode()?;
1413
1414        if !current_leaf.would_fit(entry_bytes.len()) {
1415            let next_leaf_page_num = alloc.allocate();
1416            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1417            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1418            current_leaf = TablePage::empty();
1419            current_leaf_page = next_leaf_page_num;
1420
1421            if !current_leaf.would_fit(entry_bytes.len()) {
1422                // A single posting cell exceeds page capacity. Phase
1423                // 8c MVP doesn't chain via overflow cells (the plan
1424                // notes this as a stretch goal); surface a clear
1425                // error so users know which term tripped it.
1426                return Err(SQLRiteError::Internal(format!(
1427                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1428                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1429                    cell.cell_id,
1430                    entry_bytes.len(),
1431                    current_leaf.free_space()
1432                )));
1433            }
1434        }
1435        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1436        current_max_rowid = Some(cell.cell_id);
1437    }
1438
1439    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1440    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1441    Ok(leaves)
1442}
1443
1444/// (rowid, value) pairs as decoded from a single FTS cell — value is
1445/// either term frequency (posting cell) or doc length (sidecar cell).
1446type FtsEntries = Vec<(i64, u32)>;
1447/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1448type FtsPostings = Vec<(String, FtsEntries)>;
1449
1450/// Phase 8c — read every cell of an FTS index from `root_page` back
1451/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1452/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1453/// sibling chain, decode each slot.
1454fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1455    use crate::sql::pager::fts_cell::FtsPostingCell;
1456
1457    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1458    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1459    let mut saw_sidecar = false;
1460
1461    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1462    let mut current = first_leaf;
1463    while current != 0 {
1464        let page_buf = pager
1465            .read_page(current)
1466            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1467        if page_buf[0] != PageType::TableLeaf as u8 {
1468            return Err(SQLRiteError::Internal(format!(
1469                "page {current} tagged {} but expected TableLeaf (FTS)",
1470                page_buf[0]
1471            )));
1472        }
1473        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1474        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1475            .try_into()
1476            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1477        let leaf = TablePage::from_bytes(payload);
1478        for slot in 0..leaf.slot_count() {
1479            let offset = leaf.slot_offset_raw(slot)?;
1480            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1481            if cell.is_doc_lengths() {
1482                if saw_sidecar {
1483                    return Err(SQLRiteError::Internal(
1484                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1485                    ));
1486                }
1487                saw_sidecar = true;
1488                doc_lengths = cell.entries;
1489            } else {
1490                postings.push((cell.term, cell.entries));
1491            }
1492        }
1493        current = next_leaf;
1494    }
1495
1496    if !saw_sidecar {
1497        return Err(SQLRiteError::Internal(
1498            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1499        ));
1500    }
1501    Ok((doc_lengths, postings))
1502}
1503
1504/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1505/// `serialize_nodes` already returns nodes in ascending node_id order,
1506/// so the slot directory's rowid ordering stays valid.
1507fn stage_hnsw_leaves(
1508    pager: &mut Pager,
1509    idx: &crate::sql::hnsw::HnswIndex,
1510    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1511) -> Result<Vec<(u32, i64)>> {
1512    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1513
1514    let mut leaves: Vec<(u32, i64)> = Vec::new();
1515    let mut current_leaf = TablePage::empty();
1516    let mut current_leaf_page = alloc.allocate();
1517    let mut current_max_rowid: Option<i64> = None;
1518
1519    let serialized = idx.serialize_nodes();
1520
1521    // Empty index → emit a single empty leaf page so the rootpage
1522    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1523    // it just happens to be empty"). load_hnsw_nodes is fine with an
1524    // empty leaf — slot_count() returns 0.
1525    for (node_id, layers) in serialized {
1526        let cell = HnswNodeCell::new(node_id, layers);
1527        let entry_bytes = cell.encode()?;
1528
1529        if !current_leaf.would_fit(entry_bytes.len()) {
1530            let next_leaf_page_num = alloc.allocate();
1531            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1532            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1533            current_leaf = TablePage::empty();
1534            current_leaf_page = next_leaf_page_num;
1535
1536            if !current_leaf.would_fit(entry_bytes.len()) {
1537                return Err(SQLRiteError::Internal(format!(
1538                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1539                    entry_bytes.len(),
1540                    current_leaf.free_space()
1541                )));
1542            }
1543        }
1544        current_leaf.insert_entry(node_id, &entry_bytes)?;
1545        current_max_rowid = Some(node_id);
1546    }
1547
1548    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1549    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1550    Ok(leaves)
1551}
1552
1553fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1554    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1555    let mut current = first_leaf;
1556    while current != 0 {
1557        let page_buf = pager
1558            .read_page(current)
1559            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1560        if page_buf[0] != PageType::TableLeaf as u8 {
1561            return Err(SQLRiteError::Internal(format!(
1562                "page {current} tagged {} but expected TableLeaf",
1563                page_buf[0]
1564            )));
1565        }
1566        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1567        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1568            .try_into()
1569            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1570        let leaf = TablePage::from_bytes(payload);
1571
1572        for slot in 0..leaf.slot_count() {
1573            let entry = leaf.entry_at(slot)?;
1574            let cell = match entry {
1575                PagedEntry::Local(c) => c,
1576                PagedEntry::Overflow(r) => {
1577                    let body_bytes =
1578                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1579                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1580                    c
1581                }
1582            };
1583            table.restore_row(cell.rowid, cell.values)?;
1584        }
1585        current = next_leaf;
1586    }
1587    Ok(())
1588}
1589
1590/// Walks every page reachable from `root_page` and returns their page
1591/// numbers. Includes `root_page`, every interior page, every leaf, and
1592/// — when `follow_overflow` is true — every overflow page chained off
1593/// table-leaf cells. Used by `save_database` to seed each table's
1594/// per-table preferred pool and to compute the newly-freed set.
1595///
1596/// `follow_overflow = true` for table B-Trees (cells may carry
1597/// `OverflowRef`s pointing at chained overflow pages); `false` for
1598/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1599/// current encoding.
1600fn collect_pages_for_btree(
1601    pager: &Pager,
1602    root_page: u32,
1603    follow_overflow: bool,
1604) -> Result<Vec<u32>> {
1605    if root_page == 0 {
1606        return Ok(Vec::new());
1607    }
1608    let mut pages: Vec<u32> = Vec::new();
1609    let mut stack: Vec<u32> = vec![root_page];
1610
1611    while let Some(p) = stack.pop() {
1612        let buf = pager.read_page(p).ok_or_else(|| {
1613            SQLRiteError::Internal(format!(
1614                "collect_pages: missing page {p} (rooted at {root_page})"
1615            ))
1616        })?;
1617        pages.push(p);
1618        match buf[0] {
1619            t if t == PageType::InteriorNode as u8 => {
1620                let payload: &[u8; PAYLOAD_PER_PAGE] =
1621                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1622                        SQLRiteError::Internal("interior payload slice size".to_string())
1623                    })?;
1624                let interior = InteriorPage::from_bytes(payload);
1625                // Push every divider's child + the rightmost child.
1626                for slot in 0..interior.slot_count() {
1627                    let cell = interior.cell_at(slot)?;
1628                    stack.push(cell.child_page);
1629                }
1630                stack.push(interior.rightmost_child());
1631            }
1632            t if t == PageType::TableLeaf as u8 => {
1633                if follow_overflow {
1634                    let payload: &[u8; PAYLOAD_PER_PAGE] =
1635                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1636                            SQLRiteError::Internal("leaf payload slice size".to_string())
1637                        })?;
1638                    let leaf = TablePage::from_bytes(payload);
1639                    for slot in 0..leaf.slot_count() {
1640                        match leaf.entry_at(slot)? {
1641                            PagedEntry::Local(_) => {}
1642                            PagedEntry::Overflow(r) => {
1643                                let mut cur = r.first_overflow_page;
1644                                while cur != 0 {
1645                                    pages.push(cur);
1646                                    let ob = pager.read_page(cur).ok_or_else(|| {
1647                                        SQLRiteError::Internal(format!(
1648                                            "collect_pages: missing overflow page {cur}"
1649                                        ))
1650                                    })?;
1651                                    if ob[0] != PageType::Overflow as u8 {
1652                                        return Err(SQLRiteError::Internal(format!(
1653                                            "collect_pages: page {cur} expected Overflow, got tag {}",
1654                                            ob[0]
1655                                        )));
1656                                    }
1657                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
1658                                }
1659                            }
1660                        }
1661                    }
1662                }
1663            }
1664            other => {
1665                return Err(SQLRiteError::Internal(format!(
1666                    "collect_pages: unexpected page type {other} at page {p}"
1667                )));
1668            }
1669        }
1670    }
1671    Ok(pages)
1672}
1673
1674/// Reads the previously-persisted `sqlrite_master` and returns a map from
1675/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1676/// seed each table/index's per-table preferred pool with the pages it
1677/// occupied last time round.
1678///
1679/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1680/// the three index families via the SQL string, but for page-collection
1681/// purposes a "table" tree must follow overflow refs while an "index"
1682/// tree never does — that's the only distinction we need here).
1683fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
1684    let mut out: HashMap<(String, String), u32> = HashMap::new();
1685    if schema_root == 0 {
1686        return Ok(out);
1687    }
1688    let mut master = build_empty_master_table();
1689    load_table_rows(pager, &mut master, schema_root)?;
1690    for rowid in master.rowids() {
1691        let kind = take_text(&master, "type", rowid)?;
1692        let name = take_text(&master, "name", rowid)?;
1693        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
1694        out.insert((kind, name), rootpage);
1695    }
1696    Ok(out)
1697}
1698
1699/// Descends from `root_page` through `InteriorNode` pages, always taking
1700/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1701/// page number. A root that's already a leaf is returned as-is.
1702fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1703    let mut current = root_page;
1704    loop {
1705        let page_buf = pager.read_page(current).ok_or_else(|| {
1706            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1707        })?;
1708        match page_buf[0] {
1709            t if t == PageType::TableLeaf as u8 => return Ok(current),
1710            t if t == PageType::InteriorNode as u8 => {
1711                let payload: &[u8; PAYLOAD_PER_PAGE] =
1712                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1713                        SQLRiteError::Internal("interior payload slice size".to_string())
1714                    })?;
1715                let interior = InteriorPage::from_bytes(payload);
1716                current = interior.leftmost_child()?;
1717            }
1718            other => {
1719                return Err(SQLRiteError::Internal(format!(
1720                    "unexpected page type {other} during tree descent at page {current}"
1721                )));
1722            }
1723        }
1724    }
1725}
1726
1727/// Stages a table's B-Tree, drawing every page number from `alloc`.
1728/// Returns the root page (the topmost interior page, or the single leaf
1729/// when the table fits in one page).
1730///
1731/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1732/// `next_page`, then if more than one leaf, recursively wrap them in
1733/// `InteriorNode` levels until one root remains.
1734///
1735/// Deterministic: same rows + same allocator handouts → byte-identical
1736/// pages at the same numbers, so the diff pager skips unchanged tables.
1737fn stage_table_btree(
1738    pager: &mut Pager,
1739    table: &Table,
1740    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1741) -> Result<u32> {
1742    let leaves = stage_leaves(pager, table, alloc)?;
1743    if leaves.len() == 1 {
1744        return Ok(leaves[0].0);
1745    }
1746    let mut level: Vec<(u32, i64)> = leaves;
1747    while level.len() > 1 {
1748        level = stage_interior_level(pager, &level, alloc)?;
1749    }
1750    Ok(level[0].0)
1751}
1752
1753/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1754/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1755/// interior level. Allocates leaf and overflow pages from `alloc`.
1756fn stage_leaves(
1757    pager: &mut Pager,
1758    table: &Table,
1759    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1760) -> Result<Vec<(u32, i64)>> {
1761    let mut leaves: Vec<(u32, i64)> = Vec::new();
1762    let mut current_leaf = TablePage::empty();
1763    let mut current_leaf_page = alloc.allocate();
1764    let mut current_max_rowid: Option<i64> = None;
1765
1766    for rowid in table.rowids() {
1767        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
1768
1769        if !current_leaf.would_fit(entry_bytes.len()) {
1770            // The new leaf goes at whatever the allocator hands out
1771            // next. Commit the current leaf with that as its sibling
1772            // pointer.
1773            let next_leaf_page_num = alloc.allocate();
1774            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1775            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1776            current_leaf = TablePage::empty();
1777            current_leaf_page = next_leaf_page_num;
1778            // current_max_rowid is reassigned by the insert below; no need
1779            // to zero it out here.
1780
1781            if !current_leaf.would_fit(entry_bytes.len()) {
1782                return Err(SQLRiteError::Internal(format!(
1783                    "entry of {} bytes exceeds empty-page capacity {}",
1784                    entry_bytes.len(),
1785                    current_leaf.free_space()
1786                )));
1787            }
1788        }
1789        current_leaf.insert_entry(rowid, &entry_bytes)?;
1790        current_max_rowid = Some(rowid);
1791    }
1792
1793    // Final leaf: sibling next_page = 0 (end of chain).
1794    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1795    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1796    Ok(leaves)
1797}
1798
1799/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1800/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1801/// encoded cell exceeded the inline threshold. Allocates any overflow
1802/// pages from `alloc`.
1803fn build_row_entry(
1804    pager: &mut Pager,
1805    table: &Table,
1806    rowid: i64,
1807    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1808) -> Result<Vec<u8>> {
1809    let values = table.extract_row(rowid);
1810    let local_cell = Cell::new(rowid, values);
1811    let local_bytes = local_cell.encode()?;
1812    if local_bytes.len() > OVERFLOW_THRESHOLD {
1813        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
1814        Ok(OverflowRef {
1815            rowid,
1816            total_body_len: local_bytes.len() as u64,
1817            first_overflow_page: overflow_start,
1818        }
1819        .encode())
1820    } else {
1821        Ok(local_bytes)
1822    }
1823}
1824
1825/// Builds one level of `InteriorNode` pages above the given children.
1826/// Each interior packs as many dividers as will fit; the last child
1827/// assigned to an interior becomes its `rightmost_child`. Returns the
1828/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
1829fn stage_interior_level(
1830    pager: &mut Pager,
1831    children: &[(u32, i64)],
1832    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1833) -> Result<Vec<(u32, i64)>> {
1834    let mut next_level: Vec<(u32, i64)> = Vec::new();
1835    let mut idx = 0usize;
1836
1837    while idx < children.len() {
1838        let interior_page_num = alloc.allocate();
1839
1840        // Seed the interior with the first unassigned child as its
1841        // rightmost. As we add more children, the previous rightmost
1842        // graduates to being a divider and the new arrival takes over
1843        // as rightmost.
1844        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1845        idx += 1;
1846        let mut interior = InteriorPage::empty(rightmost_child_page);
1847
1848        while idx < children.len() {
1849            let new_divider_cell = InteriorCell {
1850                divider_rowid: rightmost_child_max,
1851                child_page: rightmost_child_page,
1852            };
1853            let new_divider_bytes = new_divider_cell.encode();
1854            if !interior.would_fit(new_divider_bytes.len()) {
1855                break;
1856            }
1857            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1858            let (next_child_page, next_child_max) = children[idx];
1859            interior.set_rightmost_child(next_child_page);
1860            rightmost_child_page = next_child_page;
1861            rightmost_child_max = next_child_max;
1862            idx += 1;
1863        }
1864
1865        emit_interior(pager, interior_page_num, &interior);
1866        next_level.push((interior_page_num, rightmost_child_max));
1867    }
1868
1869    Ok(next_level)
1870}
1871
1872/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1873fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1874    let mut buf = [0u8; PAGE_SIZE];
1875    buf[0] = PageType::TableLeaf as u8;
1876    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1877    // For leaf pages the legacy `payload_len` field isn't used — the slot
1878    // directory self-describes. Zero it by convention.
1879    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1880    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1881    pager.stage_page(page_num, buf);
1882}
1883
1884/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1885/// don't use `next_page` (there's no sibling chain between interiors);
1886/// `payload_len` is also unused (the slot directory self-describes).
1887fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1888    let mut buf = [0u8; PAGE_SIZE];
1889    buf[0] = PageType::InteriorNode as u8;
1890    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1891    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1892    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1893    pager.stage_page(page_num, buf);
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898    use super::*;
1899    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
1900    use crate::sql::process_command;
1901
1902    fn seed_db() -> Database {
1903        let mut db = Database::new("test".to_string());
1904        process_command(
1905            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1906            &mut db,
1907        )
1908        .unwrap();
1909        process_command(
1910            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1911            &mut db,
1912        )
1913        .unwrap();
1914        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1915        process_command(
1916            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1917            &mut db,
1918        )
1919        .unwrap();
1920        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1921        db
1922    }
1923
1924    fn tmp_path(name: &str) -> std::path::PathBuf {
1925        let mut p = std::env::temp_dir();
1926        let pid = std::process::id();
1927        let nanos = std::time::SystemTime::now()
1928            .duration_since(std::time::UNIX_EPOCH)
1929            .map(|d| d.as_nanos())
1930            .unwrap_or(0);
1931        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1932        p
1933    }
1934
1935    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1936    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1937    fn cleanup(path: &std::path::Path) {
1938        let _ = std::fs::remove_file(path);
1939        let mut wal = path.as_os_str().to_owned();
1940        wal.push("-wal");
1941        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1942    }
1943
1944    #[test]
1945    fn round_trip_preserves_schema_and_data() {
1946        let path = tmp_path("roundtrip");
1947        let mut db = seed_db();
1948        save_database(&mut db, &path).expect("save");
1949
1950        let loaded = open_database(&path, "test".to_string()).expect("open");
1951        assert_eq!(loaded.tables.len(), 2);
1952
1953        let users = loaded.get_table("users".to_string()).expect("users table");
1954        assert_eq!(users.columns.len(), 3);
1955        let rowids = users.rowids();
1956        assert_eq!(rowids.len(), 2);
1957        let names: Vec<String> = rowids
1958            .iter()
1959            .filter_map(|r| match users.get_value("name", *r) {
1960                Some(Value::Text(s)) => Some(s),
1961                _ => None,
1962            })
1963            .collect();
1964        assert!(names.contains(&"alice".to_string()));
1965        assert!(names.contains(&"bob".to_string()));
1966
1967        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1968        assert_eq!(notes.rowids().len(), 1);
1969
1970        cleanup(&path);
1971    }
1972
1973    // -----------------------------------------------------------------
1974    // Phase 7a — VECTOR(N) save / reopen round-trip
1975    // -----------------------------------------------------------------
1976
1977    #[test]
1978    fn round_trip_preserves_vector_column() {
1979        let path = tmp_path("vec_roundtrip");
1980
1981        // Build, populate, save.
1982        {
1983            let mut db = Database::new("test".to_string());
1984            process_command(
1985                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1986                &mut db,
1987            )
1988            .unwrap();
1989            process_command(
1990                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1991                &mut db,
1992            )
1993            .unwrap();
1994            process_command(
1995                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1996                &mut db,
1997            )
1998            .unwrap();
1999            save_database(&mut db, &path).expect("save");
2000        } // db drops → its exclusive lock releases before reopen.
2001
2002        // Reopen and verify schema + data both round-tripped.
2003        let loaded = open_database(&path, "test".to_string()).expect("open");
2004        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2005
2006        // Schema preserved: column is still VECTOR(3).
2007        let embedding_col = docs
2008            .columns
2009            .iter()
2010            .find(|c| c.column_name == "embedding")
2011            .expect("embedding column");
2012        assert!(
2013            matches!(embedding_col.datatype, DataType::Vector(3)),
2014            "expected DataType::Vector(3) after round-trip, got {:?}",
2015            embedding_col.datatype
2016        );
2017
2018        // Data preserved: both vectors still readable bit-for-bit.
2019        let mut rows: Vec<Vec<f32>> = docs
2020            .rowids()
2021            .iter()
2022            .filter_map(|r| match docs.get_value("embedding", *r) {
2023                Some(Value::Vector(v)) => Some(v),
2024                _ => None,
2025            })
2026            .collect();
2027        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
2028        assert_eq!(rows.len(), 2);
2029        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
2030        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
2031
2032        cleanup(&path);
2033    }
2034
2035    #[test]
2036    fn round_trip_preserves_json_column() {
2037        // Phase 7e — JSON columns are stored as Text under the hood with
2038        // INSERT-time validation. Save + reopen should preserve the
2039        // schema (DataType::Json) and the underlying text bytes; a
2040        // post-reopen json_extract should still resolve paths correctly.
2041        let path = tmp_path("json_roundtrip");
2042
2043        {
2044            let mut db = Database::new("test".to_string());
2045            process_command(
2046                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2047                &mut db,
2048            )
2049            .unwrap();
2050            process_command(
2051                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2052                &mut db,
2053            )
2054            .unwrap();
2055            save_database(&mut db, &path).expect("save");
2056        }
2057
2058        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2059        let docs = loaded.get_table("docs".to_string()).expect("docs");
2060
2061        // Schema: column declared as JSON, restored with the same type.
2062        let payload_col = docs
2063            .columns
2064            .iter()
2065            .find(|c| c.column_name == "payload")
2066            .unwrap();
2067        assert!(
2068            matches!(payload_col.datatype, DataType::Json),
2069            "expected DataType::Json, got {:?}",
2070            payload_col.datatype
2071        );
2072
2073        // json_extract works against the reopened data — exercises the
2074        // full Text-storage + serde_json::from_str path post-reopen.
2075        let resp = process_command(
2076            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2077            &mut loaded,
2078        )
2079        .expect("select via json_extract after reopen");
2080        assert!(resp.contains("1 row returned"), "got: {resp}");
2081
2082        cleanup(&path);
2083    }
2084
2085    #[test]
2086    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
2087        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2088        // pages. After save+reopen the index entry reattaches with the
2089        // same column + same node count, loaded directly from disk
2090        // instead of re-walking rows.
2091        let path = tmp_path("hnsw_roundtrip");
2092
2093        // Build, populate, index, save.
2094        {
2095            let mut db = Database::new("test".to_string());
2096            process_command(
2097                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2098                &mut db,
2099            )
2100            .unwrap();
2101            for v in &[
2102                "[1.0, 0.0]",
2103                "[2.0, 0.0]",
2104                "[0.0, 3.0]",
2105                "[1.0, 4.0]",
2106                "[10.0, 10.0]",
2107            ] {
2108                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2109            }
2110            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2111            save_database(&mut db, &path).expect("save");
2112        } // db drops → exclusive lock releases.
2113
2114        // Reopen and verify the index reattached, with the same name +
2115        // column + populated graph.
2116        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2117        {
2118            let table = loaded.get_table("docs".to_string()).expect("docs");
2119            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
2120            let entry = &table.hnsw_indexes[0];
2121            assert_eq!(entry.name, "ix_e");
2122            assert_eq!(entry.column_name, "e");
2123            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
2124            assert!(
2125                !entry.needs_rebuild,
2126                "fresh load should not be marked dirty"
2127            );
2128        }
2129
2130        // Quick functional check: KNN query through the loaded index
2131        // returns results.
2132        let resp = process_command(
2133            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2134            &mut loaded,
2135        )
2136        .unwrap();
2137        assert!(resp.contains("3 rows returned"), "got: {resp}");
2138
2139        cleanup(&path);
2140    }
2141
2142    #[test]
2143    fn round_trip_rebuilds_fts_index_from_create_sql() {
2144        // Phase 8c: FTS indexes now persist their posting lists as
2145        // cell-encoded pages. After save+reopen the index entry
2146        // reattaches with the same column + same posting count, loaded
2147        // directly from disk (no re-tokenization).
2148        let path = tmp_path("fts_roundtrip");
2149
2150        {
2151            let mut db = Database::new("test".to_string());
2152            process_command(
2153                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2154                &mut db,
2155            )
2156            .unwrap();
2157            for body in &[
2158                "rust embedded database",
2159                "rust web framework",
2160                "go embedded systems",
2161                "python web framework",
2162                "rust rust embedded power",
2163            ] {
2164                process_command(
2165                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2166                    &mut db,
2167                )
2168                .unwrap();
2169            }
2170            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2171            save_database(&mut db, &path).expect("save");
2172        } // db drops → exclusive lock releases.
2173
2174        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2175        {
2176            let table = loaded.get_table("docs".to_string()).expect("docs");
2177            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
2178            let entry = &table.fts_indexes[0];
2179            assert_eq!(entry.name, "ix_body");
2180            assert_eq!(entry.column_name, "body");
2181            assert_eq!(
2182                entry.index.len(),
2183                5,
2184                "rebuilt posting list should hold all 5 rows"
2185            );
2186            assert!(!entry.needs_rebuild);
2187        }
2188
2189        // Functional smoke: an FTS query through the reloaded index
2190        // returns the expected hit count.
2191        let resp = process_command(
2192            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2193            &mut loaded,
2194        )
2195        .unwrap();
2196        assert!(resp.contains("3 rows returned"), "got: {resp}");
2197
2198        cleanup(&path);
2199    }
2200
2201    #[test]
2202    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
2203        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2204        // from current rows; reopen replays the CREATE INDEX SQL against
2205        // the post-delete row set. The deleted rowid must not surface
2206        // in `fts_match` results post-reopen.
2207        let path = tmp_path("fts_delete_rebuild");
2208        let mut db = Database::new("test".to_string());
2209        process_command(
2210            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2211            &mut db,
2212        )
2213        .unwrap();
2214        for body in &[
2215            "rust embedded",
2216            "rust framework",
2217            "go embedded",
2218            "python web",
2219        ] {
2220            process_command(
2221                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2222                &mut db,
2223            )
2224            .unwrap();
2225        }
2226        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2227
2228        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2229        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2230        save_database(&mut db, &path).expect("save");
2231        drop(db);
2232
2233        let mut loaded = open_database(&path, "test".to_string()).expect("open");
2234        let resp = process_command(
2235            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2236            &mut loaded,
2237        )
2238        .unwrap();
2239        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2240        // 'rust'. Post-delete: only id=2 remains.
2241        assert!(resp.contains("1 row returned"), "got: {resp}");
2242
2243        cleanup(&path);
2244    }
2245
2246    #[test]
2247    fn fts_roundtrip_uses_persistence_path_not_replay() {
2248        // Phase 8c — assert the reload didn't go through the
2249        // rootpage=0 replay shortcut. We do this by reading the
2250        // sqlrite_master row for the FTS index and confirming its
2251        // rootpage field is non-zero.
2252        let path = tmp_path("fts_persistence_path");
2253
2254        {
2255            let mut db = Database::new("test".to_string());
2256            process_command(
2257                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2258                &mut db,
2259            )
2260            .unwrap();
2261            process_command(
2262                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2263                &mut db,
2264            )
2265            .unwrap();
2266            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2267            save_database(&mut db, &path).expect("save");
2268        }
2269
2270        // Read raw sqlrite_master to find the FTS index row.
2271        let pager = Pager::open(&path).expect("open pager");
2272        let mut master = build_empty_master_table();
2273        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2274        let mut found_rootpage: Option<u32> = None;
2275        for rowid in master.rowids() {
2276            let name = take_text(&master, "name", rowid).unwrap();
2277            if name == "ix_body" {
2278                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2279                found_rootpage = Some(rp as u32);
2280            }
2281        }
2282        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2283        assert!(
2284            rootpage != 0,
2285            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2286        );
2287
2288        cleanup(&path);
2289    }
2290
2291    #[test]
2292    fn save_without_fts_keeps_format_v4() {
2293        // Phase 8c on-demand bump — a database with zero FTS indexes
2294        // continues writing the v4 header. Existing v4 users must not
2295        // see their files silently promoted to v5 by an upgrade.
2296        use crate::sql::pager::header::FORMAT_VERSION_V4;
2297
2298        let path = tmp_path("fts_no_bump");
2299        let mut db = Database::new("test".to_string());
2300        process_command(
2301            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2302            &mut db,
2303        )
2304        .unwrap();
2305        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2306        save_database(&mut db, &path).unwrap();
2307        drop(db);
2308
2309        let pager = Pager::open(&path).expect("open");
2310        assert_eq!(
2311            pager.header().format_version,
2312            FORMAT_VERSION_V4,
2313            "no-FTS save should keep v4"
2314        );
2315        cleanup(&path);
2316    }
2317
2318    #[test]
2319    fn save_with_fts_bumps_to_v5() {
2320        // Phase 8c on-demand bump — first FTS-bearing save promotes
2321        // the file to v5. v5 readers handle both v4 and v5; v4
2322        // readers correctly refuse a v5 file.
2323        use crate::sql::pager::header::FORMAT_VERSION_V5;
2324
2325        let path = tmp_path("fts_bump_v5");
2326        let mut db = Database::new("test".to_string());
2327        process_command(
2328            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2329            &mut db,
2330        )
2331        .unwrap();
2332        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2333        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2334        save_database(&mut db, &path).unwrap();
2335        drop(db);
2336
2337        let pager = Pager::open(&path).expect("open");
2338        assert_eq!(
2339            pager.header().format_version,
2340            FORMAT_VERSION_V5,
2341            "FTS save should promote to v5"
2342        );
2343        cleanup(&path);
2344    }
2345
2346    #[test]
2347    fn fts_persistence_handles_empty_and_zero_token_docs() {
2348        // Phase 8c — sidecar cell carries doc-lengths for every doc
2349        // including any with zero tokens (so total_docs is honest
2350        // post-reopen). Empty index also round-trips: a CREATE INDEX
2351        // on an empty table emits a single empty leaf with just the
2352        // (empty) sidecar.
2353        let path = tmp_path("fts_edges");
2354
2355        {
2356            let mut db = Database::new("test".to_string());
2357            process_command(
2358                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2359                &mut db,
2360            )
2361            .unwrap();
2362            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2363            // Mix: real text, then a row that tokenizes to zero tokens
2364            // (only punctuation), then real again.
2365            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2366            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2367            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2368            save_database(&mut db, &path).unwrap();
2369        }
2370
2371        let loaded = open_database(&path, "test".to_string()).expect("open");
2372        let table = loaded.get_table("docs".to_string()).unwrap();
2373        let entry = &table.fts_indexes[0];
2374        // All three rows present — including the zero-token row,
2375        // which is critical for total_docs honesty in BM25.
2376        assert_eq!(entry.index.len(), 3);
2377        // 'embedded' appears in 2 rows after reload.
2378        let res = entry
2379            .index
2380            .query("embedded", &crate::sql::fts::Bm25Params::default());
2381        assert_eq!(res.len(), 2);
2382
2383        cleanup(&path);
2384    }
2385
2386    #[test]
2387    fn fts_persistence_round_trips_large_corpus() {
2388        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2389        // single-token bodies generates enough cells to overflow a
2390        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2391        let path = tmp_path("fts_large_corpus");
2392
2393        let mut expected_terms: std::collections::BTreeSet<String> =
2394            std::collections::BTreeSet::new();
2395        {
2396            let mut db = Database::new("test".to_string());
2397            process_command(
2398                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2399                &mut db,
2400            )
2401            .unwrap();
2402            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2403            // 500 docs, each one a unique term — drives unique-term
2404            // count up so multiple leaves are required.
2405            for i in 0..500 {
2406                let term = format!("term{i:04}");
2407                process_command(
2408                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2409                    &mut db,
2410                )
2411                .unwrap();
2412                expected_terms.insert(term);
2413            }
2414            save_database(&mut db, &path).unwrap();
2415        }
2416
2417        let loaded = open_database(&path, "test".to_string()).expect("open");
2418        let table = loaded.get_table("docs".to_string()).unwrap();
2419        let entry = &table.fts_indexes[0];
2420        assert_eq!(entry.index.len(), 500);
2421
2422        // Spot-check a handful of terms come back with their original
2423        // single-row posting list.
2424        for &i in &[0_i64, 137, 248, 391, 499] {
2425            let term = format!("term{i:04}");
2426            let res = entry
2427                .index
2428                .query(&term, &crate::sql::fts::Bm25Params::default());
2429            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2430            // PrimaryKey rowids start at 1; doc i was inserted at
2431            // rowid i+1.
2432            assert_eq!(res[0].0, i + 1);
2433        }
2434
2435        cleanup(&path);
2436    }
2437
2438    #[test]
2439    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2440        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2441        // current rows + serializes; reopen loads the post-delete graph.
2442        // After all that, the deleted rowid must NOT come back from a
2443        // KNN query.
2444        let path = tmp_path("hnsw_delete_rebuild");
2445        let mut db = Database::new("test".to_string());
2446        process_command(
2447            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2448            &mut db,
2449        )
2450        .unwrap();
2451        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2452            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2453        }
2454        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2455
2456        // Delete row 1 (the closest match to [0.5, 0.0]).
2457        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2458        // Confirm it marked dirty.
2459        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2460        assert!(dirty_before_save, "DELETE should mark dirty");
2461
2462        save_database(&mut db, &path).expect("save");
2463        // Confirm save cleared the dirty flag.
2464        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2465        assert!(!dirty_after_save, "save should clear dirty");
2466        drop(db);
2467
2468        // Reopen, query for the closest match. Row 1 is gone; row 2
2469        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2470        let loaded = open_database(&path, "test".to_string()).expect("open");
2471        let docs = loaded.get_table("docs".to_string()).expect("docs");
2472
2473        // Row 1 must not appear in any storage anymore.
2474        assert!(
2475            !docs.rowids().contains(&1),
2476            "deleted row 1 should not be in row storage"
2477        );
2478        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2479
2480        // The HNSW index must also have shed the deleted node.
2481        assert_eq!(
2482            docs.hnsw_indexes[0].index.len(),
2483            3,
2484            "HNSW graph should have shed the deleted node"
2485        );
2486
2487        cleanup(&path);
2488    }
2489
2490    #[test]
2491    fn round_trip_survives_writes_after_load() {
2492        let path = tmp_path("after_load");
2493        save_database(&mut seed_db(), &path).unwrap();
2494
2495        {
2496            let mut db = open_database(&path, "test".to_string()).unwrap();
2497            process_command(
2498                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2499                &mut db,
2500            )
2501            .unwrap();
2502            save_database(&mut db, &path).unwrap();
2503        } // db drops → its exclusive lock releases before we reopen below.
2504
2505        let db2 = open_database(&path, "test".to_string()).unwrap();
2506        let users = db2.get_table("users".to_string()).unwrap();
2507        assert_eq!(users.rowids().len(), 3);
2508
2509        cleanup(&path);
2510    }
2511
2512    #[test]
2513    fn open_rejects_garbage_file() {
2514        let path = tmp_path("bad");
2515        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2516        let result = open_database(&path, "x".to_string());
2517        assert!(result.is_err());
2518        cleanup(&path);
2519    }
2520
2521    #[test]
2522    fn many_small_rows_spread_across_leaves() {
2523        let path = tmp_path("many_rows");
2524        let mut db = Database::new("big".to_string());
2525        process_command(
2526            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2527            &mut db,
2528        )
2529        .unwrap();
2530        for i in 0..200 {
2531            let body = "x".repeat(200);
2532            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2533            process_command(&q, &mut db).unwrap();
2534        }
2535        save_database(&mut db, &path).unwrap();
2536        let loaded = open_database(&path, "big".to_string()).unwrap();
2537        let things = loaded.get_table("things".to_string()).unwrap();
2538        assert_eq!(things.rowids().len(), 200);
2539        cleanup(&path);
2540    }
2541
2542    #[test]
2543    fn huge_row_goes_through_overflow() {
2544        let path = tmp_path("overflow_row");
2545        let mut db = Database::new("big".to_string());
2546        process_command(
2547            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2548            &mut db,
2549        )
2550        .unwrap();
2551        let body = "A".repeat(10_000);
2552        process_command(
2553            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2554            &mut db,
2555        )
2556        .unwrap();
2557        save_database(&mut db, &path).unwrap();
2558
2559        let loaded = open_database(&path, "big".to_string()).unwrap();
2560        let docs = loaded.get_table("docs".to_string()).unwrap();
2561        let rowids = docs.rowids();
2562        assert_eq!(rowids.len(), 1);
2563        let stored = docs.get_value("body", rowids[0]);
2564        match stored {
2565            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2566            other => panic!("expected Text, got {other:?}"),
2567        }
2568        cleanup(&path);
2569    }
2570
2571    #[test]
2572    fn create_sql_synthesis_round_trips() {
2573        // Build a table via CREATE, then verify table_to_create_sql +
2574        // parse_create_sql reproduce an equivalent column list.
2575        let mut db = Database::new("x".to_string());
2576        process_command(
2577            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2578            &mut db,
2579        )
2580        .unwrap();
2581        let t = db.get_table("t".to_string()).unwrap();
2582        let sql = table_to_create_sql(t);
2583        let (name, cols) = parse_create_sql(&sql).unwrap();
2584        assert_eq!(name, "t");
2585        assert_eq!(cols.len(), 3);
2586        assert!(cols[0].is_pk);
2587        assert!(cols[1].is_unique);
2588        assert!(cols[2].not_null);
2589    }
2590
2591    #[test]
2592    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2593        // After open, the public db.tables map should not list the master.
2594        let path = tmp_path("no_master");
2595        save_database(&mut seed_db(), &path).unwrap();
2596        let loaded = open_database(&path, "x".to_string()).unwrap();
2597        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2598        cleanup(&path);
2599    }
2600
2601    #[test]
2602    fn multi_leaf_table_produces_an_interior_root() {
2603        // 200 fat rows force the table into multiple leaves, which means
2604        // save_database must build at least one InteriorNode above them.
2605        // The test verifies the round-trip works and confirms the root is
2606        // indeed an interior page (not a leaf) by reading the page type
2607        // directly out of the open pager.
2608        let path = tmp_path("multi_leaf_interior");
2609        let mut db = Database::new("big".to_string());
2610        process_command(
2611            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2612            &mut db,
2613        )
2614        .unwrap();
2615        for i in 0..200 {
2616            let body = "x".repeat(200);
2617            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2618            process_command(&q, &mut db).unwrap();
2619        }
2620        save_database(&mut db, &path).unwrap();
2621
2622        // Confirm the round-trip preserved all 200 rows.
2623        let loaded = open_database(&path, "big".to_string()).unwrap();
2624        let things = loaded.get_table("things".to_string()).unwrap();
2625        assert_eq!(things.rowids().len(), 200);
2626
2627        // Peek at `things`'s root page via the pager attached to the
2628        // loaded DB and check it's an InteriorNode, not a leaf.
2629        let pager = loaded
2630            .pager
2631            .as_ref()
2632            .expect("loaded DB should have a pager");
2633        // sqlrite_master's row for `things` holds its root page. Easiest
2634        // way to find it: walk the leaf chain by using find_leftmost_leaf
2635        // and then hop one level up. Simpler: read the master, scan for
2636        // the "things" row, look up rootpage.
2637        let mut master = build_empty_master_table();
2638        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2639        let things_root = master
2640            .rowids()
2641            .into_iter()
2642            .find_map(|r| match master.get_value("name", r) {
2643                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2644                    Some(Value::Integer(p)) => Some(p as u32),
2645                    _ => None,
2646                },
2647                _ => None,
2648            })
2649            .expect("things should appear in sqlrite_master");
2650        let root_buf = pager.read_page(things_root).unwrap();
2651        assert_eq!(
2652            root_buf[0],
2653            PageType::InteriorNode as u8,
2654            "expected a multi-leaf table to have an interior root, got tag {}",
2655            root_buf[0]
2656        );
2657
2658        cleanup(&path);
2659    }
2660
2661    #[test]
2662    fn explicit_index_persists_across_save_and_open() {
2663        let path = tmp_path("idx_persist");
2664        let mut db = Database::new("idx".to_string());
2665        process_command(
2666            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2667            &mut db,
2668        )
2669        .unwrap();
2670        for i in 1..=5 {
2671            let tag = if i % 2 == 0 { "odd" } else { "even" };
2672            process_command(
2673                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2674                &mut db,
2675            )
2676            .unwrap();
2677        }
2678        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2679        save_database(&mut db, &path).unwrap();
2680
2681        let loaded = open_database(&path, "idx".to_string()).unwrap();
2682        let users = loaded.get_table("users".to_string()).unwrap();
2683        let idx = users
2684            .index_by_name("users_tag_idx")
2685            .expect("explicit index should survive save/open");
2686        assert_eq!(idx.column_name, "tag");
2687        assert!(!idx.is_unique);
2688        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2689        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2690        let even_rowids = idx.lookup(&Value::Text("even".into()));
2691        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2692        assert_eq!(even_rowids.len(), 3);
2693        assert_eq!(odd_rowids.len(), 2);
2694
2695        cleanup(&path);
2696    }
2697
2698    #[test]
2699    fn auto_indexes_for_unique_columns_survive_save_open() {
2700        let path = tmp_path("auto_idx_persist");
2701        let mut db = Database::new("a".to_string());
2702        process_command(
2703            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2704            &mut db,
2705        )
2706        .unwrap();
2707        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2708        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2709        save_database(&mut db, &path).unwrap();
2710
2711        let loaded = open_database(&path, "a".to_string()).unwrap();
2712        let users = loaded.get_table("users".to_string()).unwrap();
2713        // Every UNIQUE column auto-creates an index; the load path populated
2714        // it from the persisted entries.
2715        let auto_name = SecondaryIndex::auto_name("users", "email");
2716        let idx = users
2717            .index_by_name(&auto_name)
2718            .expect("auto index should be restored");
2719        assert!(idx.is_unique);
2720        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2721        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2722
2723        cleanup(&path);
2724    }
2725
2726    #[test]
2727    fn deep_tree_round_trips() {
2728        // Force a 3-level tree by bypassing process_command (which prints
2729        // the full table on every INSERT, making large bulk loads O(N^2)
2730        // in I/O). We build the Table directly via restore_row.
2731        use crate::sql::db::table::Column as TableColumn;
2732
2733        let path = tmp_path("deep_tree");
2734        let mut db = Database::new("deep".to_string());
2735        let columns = vec![
2736            TableColumn::new("id".into(), "integer".into(), true, true, true),
2737            TableColumn::new("s".into(), "text".into(), false, true, false),
2738        ];
2739        let mut table = build_empty_table("t", columns, 0);
2740        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
2741        // which with interior fanout ~400 needs 2 interior levels (3-level
2742        // tree total, counting leaves).
2743        for i in 1..=6_000i64 {
2744            let body = "q".repeat(900);
2745            table
2746                .restore_row(
2747                    i,
2748                    vec![
2749                        Some(Value::Integer(i)),
2750                        Some(Value::Text(format!("r-{i}-{body}"))),
2751                    ],
2752                )
2753                .unwrap();
2754        }
2755        db.tables.insert("t".to_string(), table);
2756        save_database(&mut db, &path).unwrap();
2757
2758        let loaded = open_database(&path, "deep".to_string()).unwrap();
2759        let t = loaded.get_table("t".to_string()).unwrap();
2760        assert_eq!(t.rowids().len(), 6_000);
2761
2762        // Confirm the tree actually grew past 2 levels — i.e., the root's
2763        // leftmost child is itself an interior page, not a leaf.
2764        let pager = loaded.pager.as_ref().unwrap();
2765        let mut master = build_empty_master_table();
2766        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2767        let t_root = master
2768            .rowids()
2769            .into_iter()
2770            .find_map(|r| match master.get_value("name", r) {
2771                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
2772                    Some(Value::Integer(p)) => Some(p as u32),
2773                    _ => None,
2774                },
2775                _ => None,
2776            })
2777            .expect("t in sqlrite_master");
2778        let root_buf = pager.read_page(t_root).unwrap();
2779        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
2780        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
2781            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
2782        let root_interior = InteriorPage::from_bytes(root_payload);
2783        let child = root_interior.leftmost_child().unwrap();
2784        let child_buf = pager.read_page(child).unwrap();
2785        assert_eq!(
2786            child_buf[0],
2787            PageType::InteriorNode as u8,
2788            "expected 3-level tree: root's leftmost child should also be InteriorNode",
2789        );
2790
2791        cleanup(&path);
2792    }
2793
2794    #[test]
2795    fn alter_rename_table_survives_save_and_reopen() {
2796        let path = tmp_path("alter_rename_table_roundtrip");
2797        let mut db = seed_db();
2798        save_database(&mut db, &path).expect("save");
2799
2800        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
2801        save_database(&mut db, &path).expect("save after rename");
2802
2803        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2804        assert!(!loaded.contains_table("users".to_string()));
2805        assert!(loaded.contains_table("members".to_string()));
2806        let members = loaded.get_table("members".to_string()).unwrap();
2807        assert_eq!(members.rowids().len(), 2, "rows should survive");
2808        // Auto-indexes followed the rename.
2809        assert!(
2810            members
2811                .index_by_name("sqlrite_autoindex_members_id")
2812                .is_some()
2813        );
2814        assert!(
2815            members
2816                .index_by_name("sqlrite_autoindex_members_name")
2817                .is_some()
2818        );
2819
2820        cleanup(&path);
2821    }
2822
2823    #[test]
2824    fn alter_rename_column_survives_save_and_reopen() {
2825        let path = tmp_path("alter_rename_col_roundtrip");
2826        let mut db = seed_db();
2827        save_database(&mut db, &path).expect("save");
2828
2829        process_command(
2830            "ALTER TABLE users RENAME COLUMN name TO full_name;",
2831            &mut db,
2832        )
2833        .expect("rename column");
2834        save_database(&mut db, &path).expect("save after rename");
2835
2836        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2837        let users = loaded.get_table("users".to_string()).unwrap();
2838        assert!(users.contains_column("full_name".to_string()));
2839        assert!(!users.contains_column("name".to_string()));
2840        // Verify a row's value survived the rename round-trip.
2841        let alice_rowid = users
2842            .rowids()
2843            .into_iter()
2844            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
2845            .expect("alice row should be findable under renamed column");
2846        assert_eq!(
2847            users.get_value("full_name", alice_rowid),
2848            Some(Value::Text("alice".to_string()))
2849        );
2850
2851        cleanup(&path);
2852    }
2853
2854    #[test]
2855    fn alter_add_column_with_default_survives_save_and_reopen() {
2856        let path = tmp_path("alter_add_default_roundtrip");
2857        let mut db = seed_db();
2858        save_database(&mut db, &path).expect("save");
2859
2860        process_command(
2861            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
2862            &mut db,
2863        )
2864        .expect("add column");
2865        save_database(&mut db, &path).expect("save after add");
2866
2867        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2868        let users = loaded.get_table("users".to_string()).unwrap();
2869        assert!(users.contains_column("status".to_string()));
2870        for rowid in users.rowids() {
2871            assert_eq!(
2872                users.get_value("status", rowid),
2873                Some(Value::Text("active".to_string())),
2874                "backfilled default should round-trip for rowid {rowid}"
2875            );
2876        }
2877        // The DEFAULT clause itself should still be on the column metadata
2878        // so a subsequent INSERT picks it up.
2879        let status_col = users
2880            .columns
2881            .iter()
2882            .find(|c| c.column_name == "status")
2883            .unwrap();
2884        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
2885
2886        cleanup(&path);
2887    }
2888
2889    #[test]
2890    fn alter_drop_column_survives_save_and_reopen() {
2891        let path = tmp_path("alter_drop_col_roundtrip");
2892        let mut db = seed_db();
2893        save_database(&mut db, &path).expect("save");
2894
2895        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
2896        save_database(&mut db, &path).expect("save after drop");
2897
2898        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2899        let users = loaded.get_table("users".to_string()).unwrap();
2900        assert!(!users.contains_column("age".to_string()));
2901        assert!(users.contains_column("name".to_string()));
2902
2903        cleanup(&path);
2904    }
2905
2906    #[test]
2907    fn drop_table_survives_save_and_reopen() {
2908        let path = tmp_path("drop_table_roundtrip");
2909        let mut db = seed_db();
2910        save_database(&mut db, &path).expect("save");
2911
2912        // Verify both tables landed.
2913        {
2914            let loaded = open_database(&path, "t".to_string()).expect("open");
2915            assert!(loaded.contains_table("users".to_string()));
2916            assert!(loaded.contains_table("notes".to_string()));
2917        }
2918
2919        process_command("DROP TABLE users;", &mut db).expect("drop users");
2920        save_database(&mut db, &path).expect("save after drop");
2921
2922        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2923        assert!(
2924            !loaded.contains_table("users".to_string()),
2925            "dropped table should not resurface on reopen"
2926        );
2927        assert!(
2928            loaded.contains_table("notes".to_string()),
2929            "untouched table should survive"
2930        );
2931
2932        cleanup(&path);
2933    }
2934
2935    #[test]
2936    fn drop_index_survives_save_and_reopen() {
2937        let path = tmp_path("drop_index_roundtrip");
2938        let mut db = Database::new("t".to_string());
2939        process_command(
2940            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
2941            &mut db,
2942        )
2943        .unwrap();
2944        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
2945        save_database(&mut db, &path).expect("save");
2946
2947        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
2948        save_database(&mut db, &path).expect("save after drop");
2949
2950        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2951        let notes = loaded.get_table("notes".to_string()).unwrap();
2952        assert!(
2953            notes.index_by_name("notes_body_idx").is_none(),
2954            "dropped index should not resurface on reopen"
2955        );
2956        // The auto-index for the PK should still be there.
2957        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
2958
2959        cleanup(&path);
2960    }
2961
2962    #[test]
2963    fn default_clause_survives_save_and_reopen() {
2964        let path = tmp_path("default_roundtrip");
2965        let mut db = Database::new("t".to_string());
2966
2967        process_command(
2968            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
2969            &mut db,
2970        )
2971        .unwrap();
2972        save_database(&mut db, &path).expect("save");
2973
2974        let mut loaded = open_database(&path, "t".to_string()).expect("open");
2975
2976        // The reloaded column metadata should still carry the DEFAULT.
2977        let users = loaded.get_table("users".to_string()).expect("users table");
2978        let status_col = users
2979            .columns
2980            .iter()
2981            .find(|c| c.column_name == "status")
2982            .expect("status column");
2983        assert_eq!(
2984            status_col.default,
2985            Some(Value::Text("active".to_string())),
2986            "DEFAULT 'active' should round-trip"
2987        );
2988        let score_col = users
2989            .columns
2990            .iter()
2991            .find(|c| c.column_name == "score")
2992            .expect("score column");
2993        assert_eq!(
2994            score_col.default,
2995            Some(Value::Integer(0)),
2996            "DEFAULT 0 should round-trip"
2997        );
2998
2999        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3000        // columns should pick them up from the reloaded schema.
3001        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
3002        let users = loaded.get_table("users".to_string()).unwrap();
3003        assert_eq!(
3004            users.get_value("status", 1),
3005            Some(Value::Text("active".to_string()))
3006        );
3007        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
3008
3009        cleanup(&path);
3010    }
3011
3012    // ---------------------------------------------------------------------
3013    // SQLR-6 — free-list + VACUUM tests
3014    // ---------------------------------------------------------------------
3015
3016    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3017    /// rather than extending the file. The page_count after drop+create
3018    /// should be at most what it was after the original two tables —
3019    /// proving the new table landed on freelist pages.
3020    #[test]
3021    fn drop_table_freelist_persists_pages_for_reuse() {
3022        let path = tmp_path("freelist_reuse");
3023        let mut db = seed_db();
3024        db.source_path = Some(path.clone());
3025        save_database(&mut db, &path).expect("save");
3026        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3027
3028        // Drop one table; its pages go on the freelist.
3029        process_command("DROP TABLE users;", &mut db).expect("drop users");
3030        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
3031        assert_eq!(
3032            pages_after_drop, pages_two_tables,
3033            "page_count should not shrink on drop — the freed pages persist on the freelist"
3034        );
3035        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
3036        assert!(
3037            head_after_drop != 0,
3038            "freelist_head must be non-zero after drop"
3039        );
3040
3041        // Re-create a similar-shaped table; should reuse freelist pages.
3042        process_command(
3043            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3044            &mut db,
3045        )
3046        .expect("create accounts");
3047        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
3048        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
3049        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3050        assert!(
3051            pages_after_create <= pages_two_tables + 2,
3052            "creating a similar-sized table after a drop should mostly draw from the \
3053             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3054        );
3055
3056        cleanup(&path);
3057    }
3058
3059    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3060    #[test]
3061    fn drop_then_vacuum_shrinks_file() {
3062        let path = tmp_path("vacuum_shrinks");
3063        let mut db = seed_db();
3064        db.source_path = Some(path.clone());
3065        // Add a few more rows to make the dropped table bigger.
3066        for i in 0..20 {
3067            process_command(
3068                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
3069                &mut db,
3070            )
3071            .unwrap();
3072        }
3073        save_database(&mut db, &path).expect("save");
3074
3075        process_command("DROP TABLE users;", &mut db).expect("drop");
3076        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
3077        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
3078        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
3079        assert!(head_before != 0, "drop should populate the freelist");
3080
3081        // VACUUM (via process_command) checkpoints internally so the
3082        // file actually shrinks on disk before we observe its size.
3083        process_command("VACUUM;", &mut db).expect("vacuum");
3084
3085        let size_after = std::fs::metadata(&path).unwrap().len();
3086        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3087        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3088        assert!(
3089            pages_after < pages_before_vacuum,
3090            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3091        );
3092        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
3093        assert!(
3094            size_after < size_before_vacuum,
3095            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3096        );
3097
3098        cleanup(&path);
3099    }
3100
3101    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3102    #[test]
3103    fn vacuum_round_trips_data() {
3104        let path = tmp_path("vacuum_round_trip");
3105        let mut db = seed_db();
3106        db.source_path = Some(path.clone());
3107        save_database(&mut db, &path).expect("save");
3108        process_command("VACUUM;", &mut db).expect("vacuum");
3109
3110        // Re-open from disk to make sure the on-disk catalog round-trips.
3111        drop(db);
3112        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
3113        assert!(loaded.contains_table("users".to_string()));
3114        assert!(loaded.contains_table("notes".to_string()));
3115        let users = loaded.get_table("users".to_string()).unwrap();
3116        // seed_db inserts two users.
3117        assert_eq!(users.rowids().len(), 2);
3118
3119        cleanup(&path);
3120    }
3121
3122    /// Format version is bumped to v6 only after a save that creates a
3123    /// non-empty freelist. VACUUM clears the freelist but doesn't
3124    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3125    #[test]
3126    fn freelist_format_version_promotion() {
3127        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3128        let path = tmp_path("v6_promotion");
3129        let mut db = seed_db();
3130        db.source_path = Some(path.clone());
3131        save_database(&mut db, &path).expect("save");
3132        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
3133        assert_eq!(
3134            v_after_save, FORMAT_VERSION_BASELINE,
3135            "fresh DB without drops should stay at the baseline version"
3136        );
3137
3138        process_command("DROP TABLE users;", &mut db).expect("drop");
3139        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
3140        assert_eq!(
3141            v_after_drop, FORMAT_VERSION_V6,
3142            "first save with a non-empty freelist must promote to V6"
3143        );
3144
3145        process_command("VACUUM;", &mut db).expect("vacuum");
3146        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
3147        assert_eq!(
3148            v_after_vacuum, FORMAT_VERSION_V6,
3149            "VACUUM must not downgrade — V6 is a strict superset"
3150        );
3151
3152        cleanup(&path);
3153    }
3154
3155    /// Freelist persists across reopen: drop, save, close, reopen,
3156    /// confirm the next CREATE TABLE re-uses pages from the persisted
3157    /// freelist (rather than extending the file).
3158    #[test]
3159    fn freelist_round_trip_through_reopen() {
3160        let path = tmp_path("freelist_reopen");
3161        let pages_two_tables;
3162        {
3163            let mut db = seed_db();
3164            db.source_path = Some(path.clone());
3165            save_database(&mut db, &path).expect("save");
3166            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
3167            process_command("DROP TABLE users;", &mut db).expect("drop");
3168            let head = db.pager.as_ref().unwrap().header().freelist_head;
3169            assert!(head != 0, "drop must populate the freelist");
3170        }
3171
3172        // Reopen from disk — the freelist must come back.
3173        let mut db = open_database(&path, "t".to_string()).expect("reopen");
3174        assert!(
3175            db.pager.as_ref().unwrap().header().freelist_head != 0,
3176            "freelist_head must survive close/reopen"
3177        );
3178
3179        process_command(
3180            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3181            &mut db,
3182        )
3183        .expect("create accounts");
3184        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
3185        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
3186        assert!(
3187            pages_after_create <= pages_two_tables + 2,
3188            "post-reopen create should reuse freelist (got {pages_after_create} > \
3189             {pages_two_tables} + 2 — file extended instead of reusing)"
3190        );
3191
3192        cleanup(&path);
3193    }
3194
3195    /// VACUUM inside an explicit transaction must error before touching the
3196    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3197    #[test]
3198    fn vacuum_inside_transaction_is_rejected() {
3199        let path = tmp_path("vacuum_txn");
3200        let mut db = seed_db();
3201        db.source_path = Some(path.clone());
3202        save_database(&mut db, &path).expect("save");
3203
3204        process_command("BEGIN;", &mut db).expect("begin");
3205        let err = process_command("VACUUM;", &mut db).unwrap_err();
3206        assert!(
3207            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3208            "expected in-transaction rejection, got: {err}"
3209        );
3210        // Roll back to leave the DB in a clean state.
3211        process_command("ROLLBACK;", &mut db).unwrap();
3212        cleanup(&path);
3213    }
3214
3215    /// VACUUM on an in-memory database is a documented no-op.
3216    #[test]
3217    fn vacuum_on_in_memory_database_is_noop() {
3218        let mut db = Database::new("mem".to_string());
3219        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
3220        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
3221        assert!(
3222            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
3223            "expected no-op message for in-memory VACUUM, got: {out}"
3224        );
3225    }
3226
3227    /// Untouched tables shouldn't write any pages on the save that
3228    /// follows a DROP of an unrelated table. Confirms the per-table
3229    /// preferred pool keeps page numbers stable so the diff pager skips
3230    /// every byte-identical leaf.
3231    #[test]
3232    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3233        // Need three tables so dropping one in the middle still leaves
3234        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3235        //   accounts, notes, users
3236        // Drop `notes`. `accounts` and `users` should keep their pages.
3237        let path = tmp_path("diff_after_drop");
3238        let mut db = Database::new("t".to_string());
3239        db.source_path = Some(path.clone());
3240        process_command(
3241            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3242            &mut db,
3243        )
3244        .unwrap();
3245        process_command(
3246            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3247            &mut db,
3248        )
3249        .unwrap();
3250        process_command(
3251            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3252            &mut db,
3253        )
3254        .unwrap();
3255        for i in 0..5 {
3256            process_command(
3257                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
3258                &mut db,
3259            )
3260            .unwrap();
3261            process_command(
3262                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
3263                &mut db,
3264            )
3265            .unwrap();
3266            process_command(
3267                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
3268                &mut db,
3269            )
3270            .unwrap();
3271        }
3272        save_database(&mut db, &path).expect("baseline save");
3273
3274        // Capture page bytes for `accounts` and `users` so we can
3275        // verify they don't change.
3276        let pager = db.pager.as_ref().unwrap();
3277        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
3278            .unwrap()
3279            .get(&("table".to_string(), "accounts".to_string()))
3280            .copied()
3281            .unwrap();
3282        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
3283            .unwrap()
3284            .get(&("table".to_string(), "users".to_string()))
3285            .copied()
3286            .unwrap();
3287        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
3288        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
3289
3290        // Drop the middle table.
3291        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
3292
3293        let pager = db.pager.as_ref().unwrap();
3294        // `accounts` and `users` should still live at the same pages
3295        // with byte-identical content.
3296        let acc_after = pager.read_page(acc_root).unwrap();
3297        let users_after = pager.read_page(users_root).unwrap();
3298        assert_eq!(
3299            &acc_after[..],
3300            &acc_bytes_before[..],
3301            "accounts root page must not be rewritten when an unrelated table is dropped"
3302        );
3303        assert_eq!(
3304            &users_after[..],
3305            &users_bytes_before[..],
3306            "users root page must not be rewritten when an unrelated table is dropped"
3307        );
3308
3309        cleanup(&path);
3310    }
3311
3312    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3313
3314    /// Builds a file-backed DB with one small "keep" table and one
3315    /// large "bloat" table, sized so the post-drop freelist will
3316    /// comfortably cross the default 25% threshold and the
3317    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3318    /// auto-VACUUM happy-path tests.
3319    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
3320        let mut db = Database::new("av".to_string());
3321        db.source_path = Some(path.to_path_buf());
3322        process_command(
3323            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3324            &mut db,
3325        )
3326        .unwrap();
3327        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
3328        process_command(
3329            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3330            &mut db,
3331        )
3332        .unwrap();
3333        // Wrap the bulk insert in a transaction so we pay one save at
3334        // COMMIT instead of 5000 round-trips through auto-save.
3335        process_command("BEGIN;", &mut db).unwrap();
3336        for i in 0..5000 {
3337            process_command(
3338                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
3339                &mut db,
3340            )
3341            .unwrap();
3342        }
3343        process_command("COMMIT;", &mut db).unwrap();
3344        db
3345    }
3346
3347    /// Default threshold (0.25) is engaged for fresh `Database`s and
3348    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3349    /// without anyone calling `VACUUM;`.
3350    #[test]
3351    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3352        let path = tmp_path("av_default_drop_table");
3353        let mut db = auto_vacuum_setup(&path);
3354        // Sanity: setup respects the shipped default.
3355        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3356
3357        // Checkpoint before measuring `size_before` so the bloat actually
3358        // lives in the main file and not just the WAL — otherwise
3359        // `size_before` is the bare 2-page header and any post-vacuum
3360        // checkpoint will look like the file *grew*.
3361        if let Some(p) = db.pager.as_mut() {
3362            let _ = p.checkpoint();
3363        }
3364        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3365        let size_before = std::fs::metadata(&path).unwrap().len();
3366        assert!(
3367            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
3368            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3369             pages so the floor doesn't suppress the trigger; got {pages_before}"
3370        );
3371
3372        // Drop the bloat table — freelist should pass 25% of page_count
3373        // and the auto-VACUUM hook should compact in place. Note: no
3374        // explicit `VACUUM;` statement is issued.
3375        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3376
3377        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3378        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3379        // Second checkpoint so the post-vacuum file shrinks on disk
3380        // (auto-VACUUM stages the compact through WAL just like manual
3381        // VACUUM does).
3382        if let Some(p) = db.pager.as_mut() {
3383            let _ = p.checkpoint();
3384        }
3385        let size_after = std::fs::metadata(&path).unwrap().len();
3386
3387        assert!(
3388            pages_after < pages_before,
3389            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3390        );
3391        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
3392        assert!(
3393            size_after < size_before,
3394            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3395        );
3396
3397        cleanup(&path);
3398    }
3399
3400    /// Setting the threshold to `None` disables the trigger entirely:
3401    /// the same workload that shrinks under the default leaves the file
3402    /// at its high-water mark.
3403    #[test]
3404    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3405        let path = tmp_path("av_disabled");
3406        let mut db = auto_vacuum_setup(&path);
3407        db.set_auto_vacuum_threshold(None).expect("disable");
3408        assert_eq!(db.auto_vacuum_threshold(), None);
3409
3410        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3411
3412        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3413
3414        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3415        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3416        assert_eq!(
3417            pages_after, pages_before,
3418            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3419        );
3420        assert!(
3421            head_after != 0,
3422            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3423        );
3424
3425        cleanup(&path);
3426    }
3427
3428    /// `DROP INDEX` is the second of three page-releasing DDL paths
3429    /// covered by SQLR-10. We bloat the freelist via a separate
3430    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3431    /// compact early), then re-arm the trigger and drop a small index
3432    /// — the cumulative freelist crosses 25% on the index drop and
3433    /// auto-VACUUM fires.
3434    ///
3435    /// The detour around bloat is necessary because building a
3436    /// secondary index on a 5000-row column would need multi-level
3437    /// interior nodes, and the cell-decoder's interior-page support
3438    /// is a separate work item from SQLR-10.
3439    #[test]
3440    fn auto_vacuum_triggers_on_drop_index() {
3441        let path = tmp_path("av_drop_index");
3442        let mut db = auto_vacuum_setup(&path);
3443
3444        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3445        // its pages land on the freelist without being reclaimed.
3446        db.set_auto_vacuum_threshold(None).expect("disable");
3447        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
3448        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
3449        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
3450        assert!(
3451            head_after_bloat_drop != 0,
3452            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3453        );
3454
3455        // Phase 2: a small index on the surviving `keep` table. The
3456        // index reuses one page from the freelist (which is fine —
3457        // freelist still holds plenty more).
3458        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
3459
3460        // Phase 3: re-arm the trigger and drop the index. The freelist
3461        // is already heavily populated from phase 1; this drop just
3462        // adds the index page on top, keeping the ratio well above
3463        // 25%, so auto-VACUUM should fire.
3464        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
3465        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
3466
3467        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3468        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
3469        assert!(
3470            pages_after < pages_after_bloat_drop,
3471            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3472             was {pages_after_bloat_drop}, now {pages_after}"
3473        );
3474        assert_eq!(
3475            head_after, 0,
3476            "auto-VACUUM after DROP INDEX must clear the freelist"
3477        );
3478
3479        cleanup(&path);
3480    }
3481
3482    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3483    /// the SQLR-10 trigger covers.
3484    #[test]
3485    fn auto_vacuum_triggers_on_alter_drop_column() {
3486        let path = tmp_path("av_alter_drop_col");
3487        let mut db = auto_vacuum_setup(&path);
3488        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3489
3490        // Drop the wide `payload` column — this rewrites every row in
3491        // `bloat` without the column, so the old leaf pages get freed.
3492        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
3493
3494        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3495        assert!(
3496            pages_after < pages_before,
3497            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3498             was {pages_before}, now {pages_after}"
3499        );
3500        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
3501
3502        cleanup(&path);
3503    }
3504
3505    /// A high threshold (0.99) suppresses the trigger when the freelist
3506    /// ratio is well below it — the file stays at HWM.
3507    #[test]
3508    fn auto_vacuum_skips_below_threshold() {
3509        let path = tmp_path("av_below_threshold");
3510        let mut db = auto_vacuum_setup(&path);
3511        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
3512
3513        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3514
3515        process_command("DROP TABLE bloat;", &mut db).expect("drop");
3516
3517        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3518        assert_eq!(
3519            pages_after, pages_before,
3520            "freelist ratio after a single drop is far below 0.99 — \
3521             page_count must stay at the HWM"
3522        );
3523        assert!(
3524            db.pager.as_ref().unwrap().header().freelist_head != 0,
3525            "drop must still populate the freelist"
3526        );
3527
3528        cleanup(&path);
3529    }
3530
3531    /// Inside an explicit transaction, the page-releasing DDL doesn't
3532    /// flush to disk yet — the freelist isn't accurate, so the trigger
3533    /// must skip. The compact would also publish in-flight work out of
3534    /// band, which is exactly what the manual `VACUUM;` rejection
3535    /// inside a txn already prevents.
3536    #[test]
3537    fn auto_vacuum_skips_inside_transaction() {
3538        let path = tmp_path("av_in_txn");
3539        let mut db = auto_vacuum_setup(&path);
3540        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3541
3542        process_command("BEGIN;", &mut db).expect("begin");
3543        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
3544        // Mid-transaction: no save has occurred, so the on-disk
3545        // freelist_head must be unchanged and page_count must not have
3546        // shifted from a sneaky compact.
3547        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
3548        assert_eq!(
3549            pages_mid, pages_before,
3550            "auto-VACUUM must not fire mid-transaction"
3551        );
3552
3553        process_command("ROLLBACK;", &mut db).expect("rollback");
3554        cleanup(&path);
3555    }
3556
3557    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3558    /// trigger even if the ratio would otherwise qualify — the cost of
3559    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3560    #[test]
3561    fn auto_vacuum_skips_under_min_pages_floor() {
3562        let path = tmp_path("av_under_floor");
3563        let mut db = seed_db(); // small: just users + notes, ~5 pages
3564        db.source_path = Some(path.clone());
3565        save_database(&mut db, &path).expect("save");
3566        // Confirm we're below the floor so the test is meaningful.
3567        let pages_before = db.pager.as_ref().unwrap().header().page_count;
3568        assert!(
3569            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
3570            "test setup is too large: floor would not apply (got {pages_before} pages, \
3571             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3572        );
3573
3574        process_command("DROP TABLE users;", &mut db).expect("drop");
3575
3576        let pages_after = db.pager.as_ref().unwrap().header().page_count;
3577        assert_eq!(
3578            pages_after, pages_before,
3579            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3580        );
3581        assert!(
3582            db.pager.as_ref().unwrap().header().freelist_head != 0,
3583            "drop must still populate the freelist normally"
3584        );
3585
3586        cleanup(&path);
3587    }
3588
3589    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3590    /// rather than silently saturating.
3591    #[test]
3592    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3593        let mut db = Database::new("t".to_string());
3594        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3595            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
3596            assert!(
3597                format!("{err}").contains("auto_vacuum_threshold"),
3598                "expected a typed range error for {bad}, got: {err}"
3599            );
3600        }
3601        // The default survives the rejected sets unchanged.
3602        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
3603        // And valid values land.
3604        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
3605        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
3606        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
3607        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
3608        db.set_auto_vacuum_threshold(None).unwrap();
3609        assert_eq!(db.auto_vacuum_threshold(), None);
3610    }
3611
3612    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
3613    /// with NotImplemented — only bare `VACUUM;` is supported.
3614    #[test]
3615    fn vacuum_modifiers_are_rejected() {
3616        let path = tmp_path("vacuum_modifiers");
3617        let mut db = seed_db();
3618        db.source_path = Some(path.clone());
3619        save_database(&mut db, &path).expect("save");
3620        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
3621            let err = process_command(stmt, &mut db).unwrap_err();
3622            assert!(
3623                format!("{err}").contains("VACUUM modifiers"),
3624                "expected modifier rejection for `{stmt}`, got: {err}"
3625            );
3626        }
3627        cleanup(&path);
3628    }
3629}