Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod cell;
34pub mod file;
35#[allow(dead_code)]
36pub mod fts_cell;
37pub mod header;
38#[allow(dead_code)]
39pub mod hnsw_cell;
40#[allow(dead_code)]
41pub mod index_cell;
42#[allow(dead_code)]
43pub mod interior_page;
44pub mod overflow;
45pub mod page;
46pub mod pager;
47#[allow(dead_code)]
48pub mod table_page;
49#[allow(dead_code)]
50pub mod varint;
51#[allow(dead_code)]
52pub mod wal;
53
54use std::collections::{BTreeMap, HashMap};
55use std::path::Path;
56use std::sync::{Arc, Mutex};
57
58use sqlparser::dialect::SQLiteDialect;
59use sqlparser::parser::Parser;
60
61use crate::error::{Result, SQLRiteError};
62use crate::sql::db::database::Database;
63use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
64use crate::sql::db::table::{Column, DataType, Row, Table, Value};
65use crate::sql::pager::cell::Cell;
66use crate::sql::pager::header::DbHeader;
67use crate::sql::pager::index_cell::IndexCell;
68use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
69use crate::sql::pager::overflow::{
70    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
71};
72use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
73use crate::sql::pager::pager::Pager;
74use crate::sql::pager::table_page::TablePage;
75use crate::sql::parser::create::CreateQuery;
76
77// Re-export so callers can spell `sql::pager::AccessMode` without
78// reaching into the `pager::pager::pager` submodule path.
79pub use crate::sql::pager::pager::AccessMode;
80
81/// Name of the internal catalog table. Reserved — user CREATEs of this
82/// name must be rejected upstream.
83pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
84
85/// Opens a database file in read-write mode. Shorthand for
86/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
87pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
88    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
89}
90
91/// Opens a database file in read-only mode. Acquires a shared OS-level
92/// advisory lock, so other read-only openers coexist but any writer is
93/// excluded. Attempts to mutate the returned `Database` (e.g. an
94/// `INSERT`, or a `save_database` call against it) bottom out in a
95/// `cannot commit: database is opened read-only` error from the Pager.
96pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
97    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
98}
99
100/// Opens a database file and reconstructs the in-memory `Database`,
101/// leaving the long-lived `Pager` attached for subsequent auto-save
102/// (read-write) or consistent-snapshot reads (read-only).
103pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
104    let pager = Pager::open_with_mode(path, mode)?;
105
106    // 1. Load sqlrite_master from the tree at header.schema_root_page.
107    let mut master = build_empty_master_table();
108    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
109
110    // 2. Two passes over master rows: first build every user table, then
111    //    attach secondary indexes. Indexes need their base table to exist
112    //    before we can populate them. Auto-indexes are created at table
113    //    build time so we only have to load explicit indexes from disk
114    //    (but we also reload the auto-index CONTENT because Table::new
115    //    built it empty).
116    let mut db = Database::new(db_name);
117    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
118
119    for rowid in master.rowids() {
120        let ty = take_text(&master, "type", rowid)?;
121        let name = take_text(&master, "name", rowid)?;
122        let sql = take_text(&master, "sql", rowid)?;
123        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
124        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
125
126        match ty.as_str() {
127            "table" => {
128                let (parsed_name, columns) = parse_create_sql(&sql)?;
129                if parsed_name != name {
130                    return Err(SQLRiteError::Internal(format!(
131                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
132                    )));
133                }
134                let mut table = build_empty_table(&name, columns, last_rowid);
135                if rootpage != 0 {
136                    load_table_rows(&pager, &mut table, rootpage)?;
137                }
138                if last_rowid > table.last_rowid {
139                    table.last_rowid = last_rowid;
140                }
141                db.tables.insert(name, table);
142            }
143            "index" => {
144                index_rows.push(IndexCatalogRow {
145                    name,
146                    sql,
147                    rootpage,
148                });
149            }
150            other => {
151                return Err(SQLRiteError::Internal(format!(
152                    "sqlrite_master row '{name}' has unknown type '{other}'"
153                )));
154            }
155        }
156    }
157
158    // Second pass: attach each index to its table. HNSW indexes
159    // (Phase 7d.2) take a different code path because their persisted
160    // form is just the CREATE INDEX SQL — the graph itself isn't
161    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
162    // and route to a graph-rebuild instead of the B-Tree-cell load.
163    //
164    // Phase 8b — same shape for FTS indexes. The posting lists aren't
165    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
166    // open and let `execute_create_index` walk current rows.
167    for row in index_rows {
168        if create_index_sql_uses_hnsw(&row.sql) {
169            rebuild_hnsw_index(&mut db, &pager, &row)?;
170        } else if create_index_sql_uses_fts(&row.sql) {
171            rebuild_fts_index(&mut db, &pager, &row)?;
172        } else {
173            attach_index(&mut db, &pager, row)?;
174        }
175    }
176
177    db.source_path = Some(path.to_path_buf());
178    db.pager = Some(pager);
179    Ok(db)
180}
181
182/// Catalog row for a secondary index — deferred until after every table is
183/// loaded so the index's base table exists by the time we populate it.
184struct IndexCatalogRow {
185    name: String,
186    sql: String,
187    rootpage: u32,
188}
189
190/// Persists `db` to disk. Same diff-commit behavior as before: only pages
191/// whose bytes actually changed get written.
192pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
193    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
194    // marked dirty. Done up front under the &mut Database borrow we
195    // already hold, before the immutable iteration loops below need
196    // their own borrow.
197    rebuild_dirty_hnsw_indexes(db);
198    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
199    rebuild_dirty_fts_indexes(db);
200
201    let same_path = db.source_path.as_deref() == Some(path);
202    let mut pager = if same_path {
203        match db.pager.take() {
204            Some(p) => p,
205            None if path.exists() => Pager::open(path)?,
206            None => Pager::create(path)?,
207        }
208    } else if path.exists() {
209        Pager::open(path)?
210    } else {
211        Pager::create(path)?
212    };
213
214    pager.clear_staged();
215
216    // Page 0 is the header; payload pages start at 1.
217    let mut next_free_page: u32 = 1;
218
219    // 1. Stage each user table's B-Tree, collecting master-row info.
220    //    `kind` is "table" or "index" — master has one row per each.
221    let mut master_rows: Vec<CatalogEntry> = Vec::new();
222
223    let mut table_names: Vec<&String> = db.tables.keys().collect();
224    table_names.sort();
225    for name in table_names {
226        if name == MASTER_TABLE_NAME {
227            return Err(SQLRiteError::Internal(format!(
228                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
229            )));
230        }
231        let table = &db.tables[name];
232        let (rootpage, new_next) = stage_table_btree(&mut pager, table, next_free_page)?;
233        next_free_page = new_next;
234        master_rows.push(CatalogEntry {
235            kind: "table".into(),
236            name: name.clone(),
237            sql: table_to_create_sql(table),
238            rootpage,
239            last_rowid: table.last_rowid,
240        });
241    }
242
243    // 2. Stage each secondary index's B-Tree. Indexes persist in a
244    //    deterministic order: sorted by (owning_table, index_name).
245    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
246    for table in db.tables.values() {
247        for idx in &table.secondary_indexes {
248            index_entries.push((table, idx));
249        }
250    }
251    index_entries
252        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
253    for (_table, idx) in index_entries {
254        let (rootpage, new_next) = stage_index_btree(&mut pager, idx, next_free_page)?;
255        next_free_page = new_next;
256        master_rows.push(CatalogEntry {
257            kind: "index".into(),
258            name: idx.name.clone(),
259            sql: idx.synthesized_sql(),
260            rootpage,
261            last_rowid: 0,
262        });
263    }
264
265    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
266    //     page trees, with the rootpage recorded in sqlrite_master.
267    //     Reopen loads the graph back from cells (fast, exact match)
268    //     instead of rebuilding from rows.
269    //
270    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
271    //     rebuilt from current rows BEFORE staging, so the on-disk
272    //     graph reflects the current row set.
273    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
274    for table in db.tables.values() {
275        for entry in &table.hnsw_indexes {
276            hnsw_entries.push((table, entry));
277        }
278    }
279    hnsw_entries
280        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
281    for (table, entry) in hnsw_entries {
282        let (rootpage, new_next) = stage_hnsw_btree(&mut pager, &entry.index, next_free_page)?;
283        next_free_page = new_next;
284        master_rows.push(CatalogEntry {
285            kind: "index".into(),
286            name: entry.name.clone(),
287            sql: format!(
288                "CREATE INDEX {} ON {} USING hnsw ({})",
289                entry.name, table.tb_name, entry.column_name
290            ),
291            rootpage,
292            last_rowid: 0,
293        });
294    }
295
296    // 2c. Phase 8c — persist FTS posting lists as their own
297    //     cell-encoded page trees, with the rootpage recorded in
298    //     sqlrite_master. Reopen loads the postings back from cells
299    //     (fast, exact match) instead of re-tokenizing rows.
300    //
301    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
302    //     rebuilt from current rows BEFORE staging by
303    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
304    //     the current row set.
305    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
306    for table in db.tables.values() {
307        for entry in &table.fts_indexes {
308            fts_entries.push((table, entry));
309        }
310    }
311    fts_entries
312        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
313    let any_fts = !fts_entries.is_empty();
314    for (table, entry) in fts_entries {
315        let (rootpage, new_next) = stage_fts_btree(&mut pager, &entry.index, next_free_page)?;
316        next_free_page = new_next;
317        master_rows.push(CatalogEntry {
318            kind: "index".into(),
319            name: entry.name.clone(),
320            sql: format!(
321                "CREATE INDEX {} ON {} USING fts ({})",
322                entry.name, table.tb_name, entry.column_name
323            ),
324            rootpage,
325            last_rowid: 0,
326        });
327    }
328
329    // 3. Build an in-memory sqlrite_master with one row per table or index,
330    //    then stage it via the same tree-build path.
331    let mut master = build_empty_master_table();
332    for (i, entry) in master_rows.into_iter().enumerate() {
333        let rowid = (i as i64) + 1;
334        master.restore_row(
335            rowid,
336            vec![
337                Some(Value::Text(entry.kind)),
338                Some(Value::Text(entry.name)),
339                Some(Value::Text(entry.sql)),
340                Some(Value::Integer(entry.rootpage as i64)),
341                Some(Value::Integer(entry.last_rowid)),
342            ],
343        )?;
344    }
345    let (master_root, master_next) = stage_table_btree(&mut pager, &master, next_free_page)?;
346    next_free_page = master_next;
347
348    // Phase 8c — on-demand v4→v5 file-format bump per Q10. If any FTS
349    // index attached to the database, write v5; otherwise preserve the
350    // pre-existing version (v4 for files born before this build, or
351    // a previously-promoted v5 file). Reads accept both.
352    let format_version = if any_fts {
353        crate::sql::pager::header::FORMAT_VERSION_V5
354    } else {
355        pager.header().format_version
356    };
357
358    pager.commit(DbHeader {
359        page_count: next_free_page,
360        schema_root_page: master_root,
361        format_version,
362    })?;
363
364    if same_path {
365        db.pager = Some(pager);
366    }
367    Ok(())
368}
369
370/// Build material for a single row in sqlrite_master.
371struct CatalogEntry {
372    kind: String, // "table" or "index"
373    name: String,
374    sql: String,
375    rootpage: u32,
376    last_rowid: i64,
377}
378
379// -------------------------------------------------------------------------
380// sqlrite_master — hardcoded catalog table schema
381
382fn build_empty_master_table() -> Table {
383    // Phase 3e: `type` is the first column, matching SQLite's convention.
384    // It distinguishes `'table'` rows from `'index'` rows.
385    let columns = vec![
386        Column::new("type".into(), "text".into(), false, true, false),
387        Column::new("name".into(), "text".into(), true, true, true),
388        Column::new("sql".into(), "text".into(), false, true, false),
389        Column::new("rootpage".into(), "integer".into(), false, true, false),
390        Column::new("last_rowid".into(), "integer".into(), false, true, false),
391    ];
392    build_empty_table(MASTER_TABLE_NAME, columns, 0)
393}
394
395/// Reads a required Text column from a known-good catalog row.
396fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
397    match table.get_value(col, rowid) {
398        Some(Value::Text(s)) => Ok(s),
399        other => Err(SQLRiteError::Internal(format!(
400            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
401        ))),
402    }
403}
404
405/// Reads a required Integer column from a known-good catalog row.
406fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
407    match table.get_value(col, rowid) {
408        Some(Value::Integer(v)) => Ok(v),
409        other => Err(SQLRiteError::Internal(format!(
410            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
411        ))),
412    }
413}
414
415// -------------------------------------------------------------------------
416// CREATE-TABLE SQL synthesis and re-parsing
417
418/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
419/// Deterministic: same schema → same SQL, so diffing commits stay stable.
420fn table_to_create_sql(table: &Table) -> String {
421    let mut parts = Vec::with_capacity(table.columns.len());
422    for c in &table.columns {
423        // Render the SQL type literally so the round-trip through
424        // CREATE TABLE re-parsing recreates the same schema. Vector
425        // carries its dimension inline.
426        let ty: String = match &c.datatype {
427            DataType::Integer => "INTEGER".to_string(),
428            DataType::Text => "TEXT".to_string(),
429            DataType::Real => "REAL".to_string(),
430            DataType::Bool => "BOOLEAN".to_string(),
431            DataType::Vector(dim) => format!("VECTOR({dim})"),
432            DataType::Json => "JSON".to_string(),
433            DataType::None | DataType::Invalid => "TEXT".to_string(),
434        };
435        let mut piece = format!("{} {}", c.column_name, ty);
436        if c.is_pk {
437            piece.push_str(" PRIMARY KEY");
438        } else {
439            if c.is_unique {
440                piece.push_str(" UNIQUE");
441            }
442            if c.not_null {
443                piece.push_str(" NOT NULL");
444            }
445        }
446        if let Some(default) = &c.default {
447            piece.push_str(" DEFAULT ");
448            piece.push_str(&render_default_literal(default));
449        }
450        parts.push(piece);
451    }
452    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
453}
454
455/// Renders a DEFAULT value back to SQL-literal form so the synthesized
456/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
457/// single-quoted with single-quote doubling for escaping. Vector defaults
458/// are not currently expressible at CREATE TABLE time, so we render them
459/// as their bracket-array form (matches the INSERT literal grammar).
460fn render_default_literal(value: &Value) -> String {
461    match value {
462        Value::Integer(i) => i.to_string(),
463        Value::Real(f) => f.to_string(),
464        Value::Bool(b) => {
465            if *b {
466                "TRUE".to_string()
467            } else {
468                "FALSE".to_string()
469            }
470        }
471        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
472        Value::Null => "NULL".to_string(),
473        Value::Vector(_) => value.to_display_string(),
474    }
475}
476
477/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
478/// and produces our internal column list. Returns `(table_name, columns)`.
479fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
480    let dialect = SQLiteDialect {};
481    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
482    let stmt = ast.pop().ok_or_else(|| {
483        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
484    })?;
485    let create = CreateQuery::new(&stmt)?;
486    let columns = create
487        .columns
488        .into_iter()
489        .map(|pc| {
490            Column::with_default(
491                pc.name,
492                pc.datatype,
493                pc.is_pk,
494                pc.not_null,
495                pc.is_unique,
496                pc.default,
497            )
498        })
499        .collect();
500    Ok((create.table_name, columns))
501}
502
503// -------------------------------------------------------------------------
504// In-memory table (re)construction
505
506/// Builds an empty in-memory `Table` given the declared columns.
507fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
508    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
509    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
510    {
511        let mut map = rows.lock().expect("rows mutex poisoned");
512        for col in &columns {
513            // Mirror the dispatch in `Table::new` so the reconstructed
514            // table has the same shape it'd have if it were built fresh
515            // from SQL. Phase 7a adds the Vector arm — without it,
516            // VECTOR columns silently restore as Row::None and every
517            // restore_row hits a "storage None vs value Some(Vector(...))"
518            // type mismatch.
519            let row = match &col.datatype {
520                DataType::Integer => Row::Integer(BTreeMap::new()),
521                DataType::Text => Row::Text(BTreeMap::new()),
522                DataType::Real => Row::Real(BTreeMap::new()),
523                DataType::Bool => Row::Bool(BTreeMap::new()),
524                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
525                // JSON columns reuse Text storage — see Table::new and
526                // Phase 7e's scope-correction note.
527                DataType::Json => Row::Text(BTreeMap::new()),
528                DataType::None | DataType::Invalid => Row::None,
529            };
530            map.insert(col.column_name.clone(), row);
531
532            // Auto-create UNIQUE/PK indexes so the restored table has the
533            // same shape Table::new would have built from fresh SQL.
534            if (col.is_pk || col.is_unique)
535                && matches!(col.datatype, DataType::Integer | DataType::Text)
536            {
537                if let Ok(idx) = SecondaryIndex::new(
538                    SecondaryIndex::auto_name(name, &col.column_name),
539                    name.to_string(),
540                    col.column_name.clone(),
541                    &col.datatype,
542                    true,
543                    IndexOrigin::Auto,
544                ) {
545                    secondary_indexes.push(idx);
546                }
547            }
548        }
549    }
550
551    let primary_key = columns
552        .iter()
553        .find(|c| c.is_pk)
554        .map(|c| c.column_name.clone())
555        .unwrap_or_else(|| "-1".to_string());
556
557    Table {
558        tb_name: name.to_string(),
559        columns,
560        rows,
561        secondary_indexes,
562        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
563        // executing each `CREATE INDEX … USING hnsw` SQL stored in
564        // `sqlrite_master`. This builder produces the empty shell;
565        // `replay_create_index_for_hnsw` (in this same module) walks
566        // sqlrite_master after every table is loaded and rebuilds the
567        // graph from current row data. Persistence of the graph itself
568        // (avoiding the on-open rebuild cost) is Phase 7d.3.
569        hnsw_indexes: Vec::new(),
570        // FTS indexes (Phase 8b) follow the same pattern — the
571        // CREATE INDEX … USING fts SQL is the source of truth on open
572        // and the in-memory posting list gets rebuilt from current
573        // rows. Cell-encoded persistence of the postings is Phase 8c.
574        fts_indexes: Vec::new(),
575        last_rowid,
576        primary_key,
577    }
578}
579
580// -------------------------------------------------------------------------
581// Leaf-chain read / write
582
583/// Walks a table's B-Tree from `root_page`, following the leftmost-child
584/// chain down to the first leaf, then iterating leaves via their sibling
585/// `next_page` pointers. Every cell is decoded and replayed into `table`.
586///
587/// Open-path note: we eagerly materialize the entire table into `Table`'s
588/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
589/// on demand so queries can stream through the tree without a full upfront
590/// load.
591/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
592/// index on its base table by walking the tree of index cells at
593/// `rootpage`. The base table is expected to already be in `db.tables`.
594fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
595    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
596
597    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
598        SQLRiteError::Internal(format!(
599            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
600            row.name
601        ))
602    })?;
603    let datatype = table
604        .columns
605        .iter()
606        .find(|c| c.column_name == column_name)
607        .map(|c| clone_datatype(&c.datatype))
608        .ok_or_else(|| {
609            SQLRiteError::Internal(format!(
610                "index '{}' references unknown column '{column_name}' on '{table_name}'",
611                row.name
612            ))
613        })?;
614
615    // An auto-index on this column may already exist (built by
616    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
617    // the slot instead of adding a duplicate entry.
618    let existing_slot = table
619        .secondary_indexes
620        .iter()
621        .position(|i| i.name == row.name);
622    let idx = match existing_slot {
623        Some(i) => {
624            // Drain any entries that may have been populated during table
625            // restore_row calls — we're about to repopulate from the
626            // persisted tree.
627            table.secondary_indexes.remove(i)
628        }
629        None => SecondaryIndex::new(
630            row.name.clone(),
631            table_name.clone(),
632            column_name.clone(),
633            &datatype,
634            is_unique,
635            IndexOrigin::Explicit,
636        )?,
637    };
638    let mut idx = idx;
639    // Wipe any stale entries from the auto path so the load is idempotent.
640    let is_unique_flag = idx.is_unique;
641    let origin = idx.origin;
642    idx = SecondaryIndex::new(
643        idx.name,
644        idx.table_name,
645        idx.column_name,
646        &datatype,
647        is_unique_flag,
648        origin,
649    )?;
650
651    // Populate from the index tree's cells.
652    load_index_rows(pager, &mut idx, row.rootpage)?;
653
654    table.secondary_indexes.push(idx);
655    Ok(())
656}
657
658/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
659/// every `(value, rowid)` pair into `idx`.
660fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
661    if root_page == 0 {
662        return Ok(());
663    }
664    let first_leaf = find_leftmost_leaf(pager, root_page)?;
665    let mut current = first_leaf;
666    while current != 0 {
667        let page_buf = pager
668            .read_page(current)
669            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
670        if page_buf[0] != PageType::TableLeaf as u8 {
671            return Err(SQLRiteError::Internal(format!(
672                "page {current} tagged {} but expected TableLeaf (index)",
673                page_buf[0]
674            )));
675        }
676        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
677        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
678            .try_into()
679            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
680        let leaf = TablePage::from_bytes(payload);
681
682        for slot in 0..leaf.slot_count() {
683            // Slots on an index page hold KIND_INDEX cells; decode directly.
684            let offset = leaf.slot_offset_raw(slot)?;
685            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
686            idx.insert(&ic.value, ic.rowid)?;
687        }
688        current = next_leaf;
689    }
690    Ok(())
691}
692
693/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
694/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
695///
696/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
697/// still works; the only shape we accept is single-column indexes.
698fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
699    use sqlparser::ast::{CreateIndex, Expr, Statement};
700
701    let dialect = SQLiteDialect {};
702    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
703    let Some(Statement::CreateIndex(CreateIndex {
704        table_name,
705        columns,
706        unique,
707        ..
708    })) = ast.pop()
709    else {
710        return Err(SQLRiteError::Internal(format!(
711            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
712        )));
713    };
714    if columns.len() != 1 {
715        return Err(SQLRiteError::NotImplemented(
716            "multi-column indexes aren't supported yet".to_string(),
717        ));
718    }
719    let col = match &columns[0].column.expr {
720        Expr::Identifier(ident) => ident.value.clone(),
721        Expr::CompoundIdentifier(parts) => {
722            parts.last().map(|p| p.value.clone()).unwrap_or_default()
723        }
724        other => {
725            return Err(SQLRiteError::Internal(format!(
726                "unsupported indexed column expression: {other:?}"
727            )));
728        }
729    };
730    Ok((table_name.to_string(), col, unique))
731}
732
733/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
734/// Used by the open path to route HNSW indexes to the graph-rebuild path
735/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
736/// don't have a USING clause, so they all return false and continue
737/// taking the existing path.
738fn create_index_sql_uses_hnsw(sql: &str) -> bool {
739    use sqlparser::ast::{CreateIndex, IndexType, Statement};
740
741    let dialect = SQLiteDialect {};
742    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
743        return false;
744    };
745    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
746        return false;
747    };
748    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
749}
750
751/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
752/// Mirrors [`create_index_sql_uses_hnsw`].
753fn create_index_sql_uses_fts(sql: &str) -> bool {
754    use sqlparser::ast::{CreateIndex, IndexType, Statement};
755
756    let dialect = SQLiteDialect {};
757    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
758        return false;
759    };
760    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
761        return false;
762    };
763    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
764}
765
766/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
767/// paths mirror [`rebuild_hnsw_index`]:
768///
769///   - **rootpage != 0** (Phase 8c default): the posting list is
770///     persisted as cell-encoded pages. Read every cell directly via
771///     [`load_fts_postings`] and reconstruct the index — no
772///     re-tokenization, exact bit-for-bit reproduction.
773///
774///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
775///     for files saved by Phase 8b before persistence landed. Replay
776///     the CREATE INDEX SQL through `execute_create_index`, which
777///     walks the table's current rows and tokenizes them fresh.
778fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
779    use crate::sql::db::table::FtsIndexEntry;
780    use crate::sql::executor::execute_create_index;
781    use crate::sql::fts::PostingList;
782    use sqlparser::ast::Statement;
783
784    let dialect = SQLiteDialect {};
785    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
786    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
787        return Err(SQLRiteError::Internal(format!(
788            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
789            row.sql
790        )));
791    };
792
793    if row.rootpage == 0 {
794        // Compatibility path — no persisted postings; replay rows.
795        execute_create_index(&stmt, db)?;
796        return Ok(());
797    }
798
799    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
800    let index = PostingList::from_persisted_postings(doc_lengths, postings);
801    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
802    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
803        SQLRiteError::Internal(format!(
804            "FTS index '{}' references unknown table '{tbl_name}'",
805            row.name
806        ))
807    })?;
808    table_mut.fts_indexes.push(FtsIndexEntry {
809        name: row.name.clone(),
810        column_name: col_name,
811        index,
812        needs_rebuild: false,
813    });
814    Ok(())
815}
816
817/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
818/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
819fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
820    use sqlparser::ast::{CreateIndex, Expr, Statement};
821
822    let dialect = SQLiteDialect {};
823    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
824    let Some(Statement::CreateIndex(CreateIndex {
825        table_name,
826        columns,
827        ..
828    })) = ast.pop()
829    else {
830        return Err(SQLRiteError::Internal(format!(
831            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
832        )));
833    };
834    if columns.len() != 1 {
835        return Err(SQLRiteError::NotImplemented(
836            "multi-column FTS indexes aren't supported yet".to_string(),
837        ));
838    }
839    let col = match &columns[0].column.expr {
840        Expr::Identifier(ident) => ident.value.clone(),
841        Expr::CompoundIdentifier(parts) => {
842            parts.last().map(|p| p.value.clone()).unwrap_or_default()
843        }
844        other => {
845            return Err(SQLRiteError::Internal(format!(
846                "FTS CREATE INDEX has unexpected column expr: {other:?}"
847            )));
848        }
849    };
850    Ok((table_name.to_string(), col))
851}
852
853/// Loads (or rebuilds) an HNSW index on database open. Two paths:
854///
855///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
856///     as cell-encoded pages. Read every node directly via
857///     `load_hnsw_nodes` and reconstruct the index — fast, zero
858///     algorithm runs, exact bit-for-bit reproduction of what was saved.
859///
860///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
861///     files saved by Phase 7d.2 before persistence landed. Replay the
862///     CREATE INDEX SQL through `execute_create_index`, which walks the
863///     table's current rows and populates a fresh graph. Slower but
864///     correctness-equivalent on the first save with the new code.
865fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
866    use crate::sql::db::table::HnswIndexEntry;
867    use crate::sql::executor::execute_create_index;
868    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
869    use sqlparser::ast::Statement;
870
871    let dialect = SQLiteDialect {};
872    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
873    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
874        return Err(SQLRiteError::Internal(format!(
875            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
876            row.sql
877        )));
878    };
879
880    if row.rootpage == 0 {
881        // Compatibility path — no persisted graph; walk current rows.
882        execute_create_index(&stmt, db)?;
883        return Ok(());
884    }
885
886    // Persistence path — read the cell tree, deserialize.
887    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
888    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
889
890    // Parse the CREATE INDEX to know which table + column to attach to
891    // — same shape as the row-walk path; we just don't execute it.
892    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
893    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
894        SQLRiteError::Internal(format!(
895            "HNSW index '{}' references unknown table '{tbl_name}'",
896            row.name
897        ))
898    })?;
899    table_mut.hnsw_indexes.push(HnswIndexEntry {
900        name: row.name.clone(),
901        column_name: col_name,
902        index,
903        needs_rebuild: false,
904    });
905    Ok(())
906}
907
908/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
909/// page tree at `root_page` and decode each cell as a node. Returns
910/// the (node_id, layers) tuples in slot-order (already ascending by
911/// node_id since they were staged that way). The caller hands them to
912/// `HnswIndex::from_persisted_nodes`.
913fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
914    use crate::sql::pager::hnsw_cell::HnswNodeCell;
915
916    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
917    let first_leaf = find_leftmost_leaf(pager, root_page)?;
918    let mut current = first_leaf;
919    while current != 0 {
920        let page_buf = pager
921            .read_page(current)
922            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
923        if page_buf[0] != PageType::TableLeaf as u8 {
924            return Err(SQLRiteError::Internal(format!(
925                "page {current} tagged {} but expected TableLeaf (HNSW)",
926                page_buf[0]
927            )));
928        }
929        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
930        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
931            .try_into()
932            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
933        let leaf = TablePage::from_bytes(payload);
934        for slot in 0..leaf.slot_count() {
935            let offset = leaf.slot_offset_raw(slot)?;
936            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
937            nodes.push((cell.node_id, cell.layers));
938        }
939        current = next_leaf;
940    }
941    Ok(nodes)
942}
943
944/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
945/// SQL string. Used by the persistence path on open to know where to
946/// attach the loaded graph. Same shape as `parse_create_index_sql` for
947/// regular indexes — only the assertion differs (we don't care about
948/// UNIQUE for HNSW).
949fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
950    use sqlparser::ast::{CreateIndex, Expr, Statement};
951
952    let dialect = SQLiteDialect {};
953    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
954    let Some(Statement::CreateIndex(CreateIndex {
955        table_name,
956        columns,
957        ..
958    })) = ast.pop()
959    else {
960        return Err(SQLRiteError::Internal(format!(
961            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
962        )));
963    };
964    if columns.len() != 1 {
965        return Err(SQLRiteError::NotImplemented(
966            "multi-column HNSW indexes aren't supported yet".to_string(),
967        ));
968    }
969    let col = match &columns[0].column.expr {
970        Expr::Identifier(ident) => ident.value.clone(),
971        Expr::CompoundIdentifier(parts) => {
972            parts.last().map(|p| p.value.clone()).unwrap_or_default()
973        }
974        other => {
975            return Err(SQLRiteError::Internal(format!(
976                "unsupported HNSW indexed column expression: {other:?}"
977            )));
978        }
979    };
980    Ok((table_name.to_string(), col))
981}
982
983/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
984/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
985/// Walks the table's current Vec<f32> column storage and runs the
986/// HNSW algorithm fresh. Called at the top of `save_database` before
987/// any immutable borrows of `db` start.
988///
989/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
990/// small tables, expensive for ≥100k-row tables — matches the
991/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
992/// MVP, more sophisticated incremental delete strategies (soft-delete
993/// + tombstones, neighbor reconnection) are future polish.
994fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
995    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
996
997    for table in db.tables.values_mut() {
998        // Snapshot which (index_name, column) pairs need rebuilding,
999        // before we go grabbing column data — keeps the borrow
1000        // structure simple.
1001        let dirty: Vec<(String, String)> = table
1002            .hnsw_indexes
1003            .iter()
1004            .filter(|e| e.needs_rebuild)
1005            .map(|e| (e.name.clone(), e.column_name.clone()))
1006            .collect();
1007        if dirty.is_empty() {
1008            continue;
1009        }
1010
1011        for (idx_name, col_name) in dirty {
1012            // Snapshot every (rowid, vec) for this column.
1013            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
1014            {
1015                let row_data = table.rows.lock().expect("rows mutex poisoned");
1016                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
1017                    for (id, v) in map.iter() {
1018                        vectors.push((*id, v.clone()));
1019                    }
1020                }
1021            }
1022            // Pre-build a HashMap for the get_vec closure so we don't
1023            // pay O(N) lookup per insert call.
1024            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
1025                vectors.iter().cloned().collect();
1026
1027            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
1028            // Sort by id so the rebuild is deterministic across runs.
1029            vectors.sort_by_key(|(id, _)| *id);
1030            for (id, v) in &vectors {
1031                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
1032            }
1033
1034            // Replace the entry's index + clear the dirty flag.
1035            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
1036                entry.index = new_idx;
1037                entry.needs_rebuild = false;
1038            }
1039        }
1040    }
1041}
1042
1043/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1044/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1045/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1046/// is empty so the per-table loop short-circuits).
1047fn rebuild_dirty_fts_indexes(db: &mut Database) {
1048    use crate::sql::fts::PostingList;
1049
1050    for table in db.tables.values_mut() {
1051        let dirty: Vec<(String, String)> = table
1052            .fts_indexes
1053            .iter()
1054            .filter(|e| e.needs_rebuild)
1055            .map(|e| (e.name.clone(), e.column_name.clone()))
1056            .collect();
1057        if dirty.is_empty() {
1058            continue;
1059        }
1060
1061        for (idx_name, col_name) in dirty {
1062            // Snapshot every (rowid, text) pair for this column under
1063            // the row mutex, then drop the lock before re-tokenizing.
1064            let mut docs: Vec<(i64, String)> = Vec::new();
1065            {
1066                let row_data = table.rows.lock().expect("rows mutex poisoned");
1067                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1068                    for (id, v) in map.iter() {
1069                        // "Null" sentinel is the parser's
1070                        // null-marker for TEXT cells; skip those —
1071                        // they'd round-trip as the literal string
1072                        // "Null" otherwise. Aligns with insert_row's
1073                        // typed_value gate.
1074                        if v != "Null" {
1075                            docs.push((*id, v.clone()));
1076                        }
1077                    }
1078                }
1079            }
1080
1081            let mut new_idx = PostingList::new();
1082            // Sort by id so the rebuild is deterministic across runs
1083            // (the BTreeMap inside PostingList is order-stable, but
1084            // doc-length aggregation order doesn't matter — sorting
1085            // here is purely for reproducibility on inspection).
1086            docs.sort_by_key(|(id, _)| *id);
1087            for (id, text) in &docs {
1088                new_idx.insert(*id, text);
1089            }
1090
1091            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1092                entry.index = new_idx;
1093                entry.needs_rebuild = false;
1094            }
1095        }
1096    }
1097}
1098
1099/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1100fn clone_datatype(dt: &DataType) -> DataType {
1101    match dt {
1102        DataType::Integer => DataType::Integer,
1103        DataType::Text => DataType::Text,
1104        DataType::Real => DataType::Real,
1105        DataType::Bool => DataType::Bool,
1106        DataType::Vector(dim) => DataType::Vector(*dim),
1107        DataType::Json => DataType::Json,
1108        DataType::None => DataType::None,
1109        DataType::Invalid => DataType::Invalid,
1110    }
1111}
1112
1113/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1114/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1115/// `(root_page, next_free_page)`.
1116///
1117/// The tree's shape matches a regular table's — leaves chained via
1118/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1119/// uniformly for index cells (same prefix as local cells), so the
1120/// existing slot directory and binary search carry over.
1121fn stage_index_btree(
1122    pager: &mut Pager,
1123    idx: &SecondaryIndex,
1124    start_page: u32,
1125) -> Result<(u32, u32)> {
1126    // Build the leaves.
1127    let (leaves, mut next_free_page) = stage_index_leaves(pager, idx, start_page)?;
1128    if leaves.len() == 1 {
1129        return Ok((leaves[0].0, next_free_page));
1130    }
1131    let mut level: Vec<(u32, i64)> = leaves;
1132    while level.len() > 1 {
1133        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1134        next_free_page = new_next_free;
1135        level = next_level;
1136    }
1137    Ok((level[0].0, next_free_page))
1138}
1139
1140/// Packs the index's (value, rowid) entries into a sibling-chained run
1141/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1142/// (ascending value; rowids in insertion order within a value), which is
1143/// also ascending by the "cell rowid" carried in each IndexCell (the
1144/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1145/// rowid ordering stays consistent.
1146fn stage_index_leaves(
1147    pager: &mut Pager,
1148    idx: &SecondaryIndex,
1149    start_page: u32,
1150) -> Result<(Vec<(u32, i64)>, u32)> {
1151    let mut leaves: Vec<(u32, i64)> = Vec::new();
1152    let mut current_leaf = TablePage::empty();
1153    let mut current_leaf_page = start_page;
1154    let mut current_max_rowid: Option<i64> = None;
1155    let mut next_free_page = start_page + 1;
1156
1157    // Sort the entries by original rowid so the in-page slot directory,
1158    // which binary-searches by rowid, stays valid. (iter_entries orders by
1159    // value; we reorder here for B-Tree correctness.)
1160    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1161    entries.sort_by_key(|(_, r)| *r);
1162
1163    for (value, rowid) in entries {
1164        let cell = IndexCell::new(rowid, value);
1165        let entry_bytes = cell.encode()?;
1166
1167        if !current_leaf.would_fit(entry_bytes.len()) {
1168            let next_leaf_page_num = next_free_page;
1169            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1170            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1171            current_leaf = TablePage::empty();
1172            current_leaf_page = next_leaf_page_num;
1173            next_free_page += 1;
1174
1175            if !current_leaf.would_fit(entry_bytes.len()) {
1176                return Err(SQLRiteError::Internal(format!(
1177                    "index entry of {} bytes exceeds empty-page capacity {}",
1178                    entry_bytes.len(),
1179                    current_leaf.free_space()
1180                )));
1181            }
1182        }
1183        current_leaf.insert_entry(rowid, &entry_bytes)?;
1184        current_max_rowid = Some(rowid);
1185    }
1186
1187    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1188    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1189    Ok((leaves, next_free_page))
1190}
1191
1192/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1193/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1194/// (node_id, layers). Returns `(root_page, next_free_page)`.
1195///
1196/// Tree shape is identical to `stage_index_btree` — chained leaves +
1197/// optional interior layers. The slot directory binary-searches by
1198/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1199/// so reads can locate any node in O(log N) once 7d.4-or-later
1200/// optimizes the load path to lazy-fetch instead of read-all.
1201/// Today, `load_hnsw_nodes` reads the entire tree on open.
1202fn stage_hnsw_btree(
1203    pager: &mut Pager,
1204    idx: &crate::sql::hnsw::HnswIndex,
1205    start_page: u32,
1206) -> Result<(u32, u32)> {
1207    let (leaves, mut next_free_page) = stage_hnsw_leaves(pager, idx, start_page)?;
1208    if leaves.len() == 1 {
1209        return Ok((leaves[0].0, next_free_page));
1210    }
1211    let mut level: Vec<(u32, i64)> = leaves;
1212    while level.len() > 1 {
1213        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1214        next_free_page = new_next_free;
1215        level = next_level;
1216    }
1217    Ok((level[0].0, next_free_page))
1218}
1219
1220/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1221/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1222/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1223/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1224/// doc-lengths map, then one cell per term in lexicographic order.
1225fn stage_fts_btree(
1226    pager: &mut Pager,
1227    idx: &crate::sql::fts::PostingList,
1228    start_page: u32,
1229) -> Result<(u32, u32)> {
1230    let (leaves, mut next_free_page) = stage_fts_leaves(pager, idx, start_page)?;
1231    if leaves.len() == 1 {
1232        return Ok((leaves[0].0, next_free_page));
1233    }
1234    let mut level: Vec<(u32, i64)> = leaves;
1235    while level.len() > 1 {
1236        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1237        next_free_page = new_next_free;
1238        level = next_level;
1239    }
1240    Ok((level[0].0, next_free_page))
1241}
1242
1243/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1244/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1245/// followed by one cell per term in lexicographic order with
1246/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1247/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1248/// returns).
1249fn stage_fts_leaves(
1250    pager: &mut Pager,
1251    idx: &crate::sql::fts::PostingList,
1252    start_page: u32,
1253) -> Result<(Vec<(u32, i64)>, u32)> {
1254    use crate::sql::pager::fts_cell::FtsPostingCell;
1255
1256    let mut leaves: Vec<(u32, i64)> = Vec::new();
1257    let mut current_leaf = TablePage::empty();
1258    let mut current_leaf_page = start_page;
1259    let mut current_max_rowid: Option<i64> = None;
1260    let mut next_free_page = start_page + 1;
1261
1262    // Build the cell sequence: sidecar first, then per-term cells. The
1263    // sidecar always exists (even on an empty index) so reload sees a
1264    // canonical "this index was persisted" marker in slot 0.
1265    let mut cell_id: i64 = 1;
1266    let mut cells: Vec<FtsPostingCell> = Vec::new();
1267    cells.push(FtsPostingCell::doc_lengths(
1268        cell_id,
1269        idx.serialize_doc_lengths(),
1270    ));
1271    for (term, entries) in idx.serialize_postings() {
1272        cell_id += 1;
1273        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1274    }
1275
1276    for cell in cells {
1277        let entry_bytes = cell.encode()?;
1278
1279        if !current_leaf.would_fit(entry_bytes.len()) {
1280            let next_leaf_page_num = next_free_page;
1281            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1282            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1283            current_leaf = TablePage::empty();
1284            current_leaf_page = next_leaf_page_num;
1285            next_free_page += 1;
1286
1287            if !current_leaf.would_fit(entry_bytes.len()) {
1288                // A single posting cell exceeds page capacity. Phase
1289                // 8c MVP doesn't chain via overflow cells (the plan
1290                // notes this as a stretch goal); surface a clear
1291                // error so users know which term tripped it.
1292                return Err(SQLRiteError::Internal(format!(
1293                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1294                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1295                    cell.cell_id,
1296                    entry_bytes.len(),
1297                    current_leaf.free_space()
1298                )));
1299            }
1300        }
1301        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1302        current_max_rowid = Some(cell.cell_id);
1303    }
1304
1305    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1306    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1307    Ok((leaves, next_free_page))
1308}
1309
1310/// (rowid, value) pairs as decoded from a single FTS cell — value is
1311/// either term frequency (posting cell) or doc length (sidecar cell).
1312type FtsEntries = Vec<(i64, u32)>;
1313/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1314type FtsPostings = Vec<(String, FtsEntries)>;
1315
1316/// Phase 8c — read every cell of an FTS index from `root_page` back
1317/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1318/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1319/// sibling chain, decode each slot.
1320fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1321    use crate::sql::pager::fts_cell::FtsPostingCell;
1322
1323    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1324    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1325    let mut saw_sidecar = false;
1326
1327    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1328    let mut current = first_leaf;
1329    while current != 0 {
1330        let page_buf = pager
1331            .read_page(current)
1332            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1333        if page_buf[0] != PageType::TableLeaf as u8 {
1334            return Err(SQLRiteError::Internal(format!(
1335                "page {current} tagged {} but expected TableLeaf (FTS)",
1336                page_buf[0]
1337            )));
1338        }
1339        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1340        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1341            .try_into()
1342            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1343        let leaf = TablePage::from_bytes(payload);
1344        for slot in 0..leaf.slot_count() {
1345            let offset = leaf.slot_offset_raw(slot)?;
1346            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1347            if cell.is_doc_lengths() {
1348                if saw_sidecar {
1349                    return Err(SQLRiteError::Internal(
1350                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1351                    ));
1352                }
1353                saw_sidecar = true;
1354                doc_lengths = cell.entries;
1355            } else {
1356                postings.push((cell.term, cell.entries));
1357            }
1358        }
1359        current = next_leaf;
1360    }
1361
1362    if !saw_sidecar {
1363        return Err(SQLRiteError::Internal(
1364            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1365        ));
1366    }
1367    Ok((doc_lengths, postings))
1368}
1369
1370/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1371/// `serialize_nodes` already returns nodes in ascending node_id order,
1372/// so the slot directory's rowid ordering stays valid.
1373fn stage_hnsw_leaves(
1374    pager: &mut Pager,
1375    idx: &crate::sql::hnsw::HnswIndex,
1376    start_page: u32,
1377) -> Result<(Vec<(u32, i64)>, u32)> {
1378    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1379
1380    let mut leaves: Vec<(u32, i64)> = Vec::new();
1381    let mut current_leaf = TablePage::empty();
1382    let mut current_leaf_page = start_page;
1383    let mut current_max_rowid: Option<i64> = None;
1384    let mut next_free_page = start_page + 1;
1385
1386    let serialized = idx.serialize_nodes();
1387
1388    // Empty index → emit a single empty leaf page so the rootpage
1389    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1390    // it just happens to be empty"). load_hnsw_nodes is fine with an
1391    // empty leaf — slot_count() returns 0.
1392    for (node_id, layers) in serialized {
1393        let cell = HnswNodeCell::new(node_id, layers);
1394        let entry_bytes = cell.encode()?;
1395
1396        if !current_leaf.would_fit(entry_bytes.len()) {
1397            let next_leaf_page_num = next_free_page;
1398            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1399            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1400            current_leaf = TablePage::empty();
1401            current_leaf_page = next_leaf_page_num;
1402            next_free_page += 1;
1403
1404            if !current_leaf.would_fit(entry_bytes.len()) {
1405                return Err(SQLRiteError::Internal(format!(
1406                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1407                    entry_bytes.len(),
1408                    current_leaf.free_space()
1409                )));
1410            }
1411        }
1412        current_leaf.insert_entry(node_id, &entry_bytes)?;
1413        current_max_rowid = Some(node_id);
1414    }
1415
1416    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1417    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1418    Ok((leaves, next_free_page))
1419}
1420
1421fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1422    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1423    let mut current = first_leaf;
1424    while current != 0 {
1425        let page_buf = pager
1426            .read_page(current)
1427            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1428        if page_buf[0] != PageType::TableLeaf as u8 {
1429            return Err(SQLRiteError::Internal(format!(
1430                "page {current} tagged {} but expected TableLeaf",
1431                page_buf[0]
1432            )));
1433        }
1434        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1435        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1436            .try_into()
1437            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1438        let leaf = TablePage::from_bytes(payload);
1439
1440        for slot in 0..leaf.slot_count() {
1441            let entry = leaf.entry_at(slot)?;
1442            let cell = match entry {
1443                PagedEntry::Local(c) => c,
1444                PagedEntry::Overflow(r) => {
1445                    let body_bytes =
1446                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1447                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1448                    c
1449                }
1450            };
1451            table.restore_row(cell.rowid, cell.values)?;
1452        }
1453        current = next_leaf;
1454    }
1455    Ok(())
1456}
1457
1458/// Descends from `root_page` through `InteriorNode` pages, always taking
1459/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1460/// page number. A root that's already a leaf is returned as-is.
1461fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1462    let mut current = root_page;
1463    loop {
1464        let page_buf = pager.read_page(current).ok_or_else(|| {
1465            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1466        })?;
1467        match page_buf[0] {
1468            t if t == PageType::TableLeaf as u8 => return Ok(current),
1469            t if t == PageType::InteriorNode as u8 => {
1470                let payload: &[u8; PAYLOAD_PER_PAGE] =
1471                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1472                        SQLRiteError::Internal("interior payload slice size".to_string())
1473                    })?;
1474                let interior = InteriorPage::from_bytes(payload);
1475                current = interior.leftmost_child()?;
1476            }
1477            other => {
1478                return Err(SQLRiteError::Internal(format!(
1479                    "unexpected page type {other} during tree descent at page {current}"
1480                )));
1481            }
1482        }
1483    }
1484}
1485
1486/// Stages a table's B-Tree starting at `start_page`. Returns
1487/// `(root_page, next_free_page)`. Builds bottom-up:
1488///
1489/// 1. Pack all row cells into `TableLeaf` pages, chaining them via each
1490///    leaf's `next_page` sibling pointer (for fast sequential scans).
1491/// 2. If the table fits in a single leaf, that leaf is the root.
1492/// 3. Otherwise, group leaves into `InteriorNode` pages; recurse up the
1493///    tree until one root remains.
1494///
1495/// Deterministic: same in-memory rows → same pages at same offsets, so
1496/// the Pager's diff commit still skips unchanged tables.
1497fn stage_table_btree(pager: &mut Pager, table: &Table, start_page: u32) -> Result<(u32, u32)> {
1498    let (leaves, mut next_free_page) = stage_leaves(pager, table, start_page)?;
1499    if leaves.len() == 1 {
1500        return Ok((leaves[0].0, next_free_page));
1501    }
1502    let mut level: Vec<(u32, i64)> = leaves;
1503    while level.len() > 1 {
1504        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1505        next_free_page = new_next_free;
1506        level = next_level;
1507    }
1508    Ok((level[0].0, next_free_page))
1509}
1510
1511/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1512/// Returns each leaf's `(page_number, max_rowid)` (used by the next level
1513/// up to build divider cells) and the first free page after the chain
1514/// including any overflow pages allocated for oversized cells.
1515fn stage_leaves(
1516    pager: &mut Pager,
1517    table: &Table,
1518    start_page: u32,
1519) -> Result<(Vec<(u32, i64)>, u32)> {
1520    let mut leaves: Vec<(u32, i64)> = Vec::new();
1521    let mut current_leaf = TablePage::empty();
1522    let mut current_leaf_page = start_page;
1523    let mut current_max_rowid: Option<i64> = None;
1524    let mut next_free_page = start_page + 1;
1525
1526    for rowid in table.rowids() {
1527        let entry_bytes = build_row_entry(pager, table, rowid, &mut next_free_page)?;
1528
1529        if !current_leaf.would_fit(entry_bytes.len()) {
1530            // Commit the current leaf. Its sibling next_page is the page
1531            // number where the new leaf will go — which is next_free_page
1532            // right now (no overflow pages have been allocated between
1533            // this decision and the new leaf's allocation below).
1534            let next_leaf_page_num = next_free_page;
1535            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1536            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1537            current_leaf = TablePage::empty();
1538            current_leaf_page = next_leaf_page_num;
1539            next_free_page += 1;
1540            // current_max_rowid is reassigned by the insert below; no need
1541            // to zero it out here.
1542
1543            if !current_leaf.would_fit(entry_bytes.len()) {
1544                return Err(SQLRiteError::Internal(format!(
1545                    "entry of {} bytes exceeds empty-page capacity {}",
1546                    entry_bytes.len(),
1547                    current_leaf.free_space()
1548                )));
1549            }
1550        }
1551        current_leaf.insert_entry(rowid, &entry_bytes)?;
1552        current_max_rowid = Some(rowid);
1553    }
1554
1555    // Final leaf: sibling next_page = 0 (end of chain).
1556    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1557    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1558    Ok((leaves, next_free_page))
1559}
1560
1561/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1562/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1563/// encoded cell exceeded the inline threshold. Advances `next_free_page`
1564/// past any overflow pages used.
1565fn build_row_entry(
1566    pager: &mut Pager,
1567    table: &Table,
1568    rowid: i64,
1569    next_free_page: &mut u32,
1570) -> Result<Vec<u8>> {
1571    let values = table.extract_row(rowid);
1572    let local_cell = Cell::new(rowid, values);
1573    let local_bytes = local_cell.encode()?;
1574    if local_bytes.len() > OVERFLOW_THRESHOLD {
1575        let overflow_start = *next_free_page;
1576        *next_free_page = write_overflow_chain(pager, &local_bytes, overflow_start)?;
1577        Ok(OverflowRef {
1578            rowid,
1579            total_body_len: local_bytes.len() as u64,
1580            first_overflow_page: overflow_start,
1581        }
1582        .encode())
1583    } else {
1584        Ok(local_bytes)
1585    }
1586}
1587
1588/// Builds one level of `InteriorNode` pages above the given children.
1589/// Each interior packs as many dividers as will fit; the last child
1590/// assigned to an interior becomes its `rightmost_child`. Returns the
1591/// emitted interior pages as `(page_number, max_rowid_in_subtree)` so the
1592/// next level can build on top of them.
1593fn stage_interior_level(
1594    pager: &mut Pager,
1595    children: &[(u32, i64)],
1596    start_page: u32,
1597) -> Result<(Vec<(u32, i64)>, u32)> {
1598    let mut next_level: Vec<(u32, i64)> = Vec::new();
1599    let mut next_free_page = start_page;
1600    let mut idx = 0usize;
1601
1602    while idx < children.len() {
1603        let interior_page_num = next_free_page;
1604        next_free_page += 1;
1605
1606        // Seed the interior with the first unassigned child as its
1607        // rightmost. As we add more children, the previous rightmost
1608        // graduates to being a divider and the new arrival takes over
1609        // as rightmost.
1610        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1611        idx += 1;
1612        let mut interior = InteriorPage::empty(rightmost_child_page);
1613
1614        while idx < children.len() {
1615            let new_divider_cell = InteriorCell {
1616                divider_rowid: rightmost_child_max,
1617                child_page: rightmost_child_page,
1618            };
1619            let new_divider_bytes = new_divider_cell.encode();
1620            if !interior.would_fit(new_divider_bytes.len()) {
1621                break;
1622            }
1623            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1624            let (next_child_page, next_child_max) = children[idx];
1625            interior.set_rightmost_child(next_child_page);
1626            rightmost_child_page = next_child_page;
1627            rightmost_child_max = next_child_max;
1628            idx += 1;
1629        }
1630
1631        emit_interior(pager, interior_page_num, &interior);
1632        next_level.push((interior_page_num, rightmost_child_max));
1633    }
1634
1635    Ok((next_level, next_free_page))
1636}
1637
1638/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1639fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1640    let mut buf = [0u8; PAGE_SIZE];
1641    buf[0] = PageType::TableLeaf as u8;
1642    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1643    // For leaf pages the legacy `payload_len` field isn't used — the slot
1644    // directory self-describes. Zero it by convention.
1645    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1646    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1647    pager.stage_page(page_num, buf);
1648}
1649
1650/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1651/// don't use `next_page` (there's no sibling chain between interiors);
1652/// `payload_len` is also unused (the slot directory self-describes).
1653fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1654    let mut buf = [0u8; PAGE_SIZE];
1655    buf[0] = PageType::InteriorNode as u8;
1656    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1657    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1658    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1659    pager.stage_page(page_num, buf);
1660}
1661
1662#[cfg(test)]
1663mod tests {
1664    use super::*;
1665    use crate::sql::process_command;
1666
1667    fn seed_db() -> Database {
1668        let mut db = Database::new("test".to_string());
1669        process_command(
1670            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1671            &mut db,
1672        )
1673        .unwrap();
1674        process_command(
1675            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1676            &mut db,
1677        )
1678        .unwrap();
1679        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1680        process_command(
1681            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1682            &mut db,
1683        )
1684        .unwrap();
1685        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1686        db
1687    }
1688
1689    fn tmp_path(name: &str) -> std::path::PathBuf {
1690        let mut p = std::env::temp_dir();
1691        let pid = std::process::id();
1692        let nanos = std::time::SystemTime::now()
1693            .duration_since(std::time::UNIX_EPOCH)
1694            .map(|d| d.as_nanos())
1695            .unwrap_or(0);
1696        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1697        p
1698    }
1699
1700    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1701    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1702    fn cleanup(path: &std::path::Path) {
1703        let _ = std::fs::remove_file(path);
1704        let mut wal = path.as_os_str().to_owned();
1705        wal.push("-wal");
1706        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1707    }
1708
1709    #[test]
1710    fn round_trip_preserves_schema_and_data() {
1711        let path = tmp_path("roundtrip");
1712        let mut db = seed_db();
1713        save_database(&mut db, &path).expect("save");
1714
1715        let loaded = open_database(&path, "test".to_string()).expect("open");
1716        assert_eq!(loaded.tables.len(), 2);
1717
1718        let users = loaded.get_table("users".to_string()).expect("users table");
1719        assert_eq!(users.columns.len(), 3);
1720        let rowids = users.rowids();
1721        assert_eq!(rowids.len(), 2);
1722        let names: Vec<String> = rowids
1723            .iter()
1724            .filter_map(|r| match users.get_value("name", *r) {
1725                Some(Value::Text(s)) => Some(s),
1726                _ => None,
1727            })
1728            .collect();
1729        assert!(names.contains(&"alice".to_string()));
1730        assert!(names.contains(&"bob".to_string()));
1731
1732        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1733        assert_eq!(notes.rowids().len(), 1);
1734
1735        cleanup(&path);
1736    }
1737
1738    // -----------------------------------------------------------------
1739    // Phase 7a — VECTOR(N) save / reopen round-trip
1740    // -----------------------------------------------------------------
1741
1742    #[test]
1743    fn round_trip_preserves_vector_column() {
1744        let path = tmp_path("vec_roundtrip");
1745
1746        // Build, populate, save.
1747        {
1748            let mut db = Database::new("test".to_string());
1749            process_command(
1750                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1751                &mut db,
1752            )
1753            .unwrap();
1754            process_command(
1755                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1756                &mut db,
1757            )
1758            .unwrap();
1759            process_command(
1760                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1761                &mut db,
1762            )
1763            .unwrap();
1764            save_database(&mut db, &path).expect("save");
1765        } // db drops → its exclusive lock releases before reopen.
1766
1767        // Reopen and verify schema + data both round-tripped.
1768        let loaded = open_database(&path, "test".to_string()).expect("open");
1769        let docs = loaded.get_table("docs".to_string()).expect("docs table");
1770
1771        // Schema preserved: column is still VECTOR(3).
1772        let embedding_col = docs
1773            .columns
1774            .iter()
1775            .find(|c| c.column_name == "embedding")
1776            .expect("embedding column");
1777        assert!(
1778            matches!(embedding_col.datatype, DataType::Vector(3)),
1779            "expected DataType::Vector(3) after round-trip, got {:?}",
1780            embedding_col.datatype
1781        );
1782
1783        // Data preserved: both vectors still readable bit-for-bit.
1784        let mut rows: Vec<Vec<f32>> = docs
1785            .rowids()
1786            .iter()
1787            .filter_map(|r| match docs.get_value("embedding", *r) {
1788                Some(Value::Vector(v)) => Some(v),
1789                _ => None,
1790            })
1791            .collect();
1792        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
1793        assert_eq!(rows.len(), 2);
1794        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1795        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1796
1797        cleanup(&path);
1798    }
1799
1800    #[test]
1801    fn round_trip_preserves_json_column() {
1802        // Phase 7e — JSON columns are stored as Text under the hood with
1803        // INSERT-time validation. Save + reopen should preserve the
1804        // schema (DataType::Json) and the underlying text bytes; a
1805        // post-reopen json_extract should still resolve paths correctly.
1806        let path = tmp_path("json_roundtrip");
1807
1808        {
1809            let mut db = Database::new("test".to_string());
1810            process_command(
1811                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
1812                &mut db,
1813            )
1814            .unwrap();
1815            process_command(
1816                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
1817                &mut db,
1818            )
1819            .unwrap();
1820            save_database(&mut db, &path).expect("save");
1821        }
1822
1823        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1824        let docs = loaded.get_table("docs".to_string()).expect("docs");
1825
1826        // Schema: column declared as JSON, restored with the same type.
1827        let payload_col = docs
1828            .columns
1829            .iter()
1830            .find(|c| c.column_name == "payload")
1831            .unwrap();
1832        assert!(
1833            matches!(payload_col.datatype, DataType::Json),
1834            "expected DataType::Json, got {:?}",
1835            payload_col.datatype
1836        );
1837
1838        // json_extract works against the reopened data — exercises the
1839        // full Text-storage + serde_json::from_str path post-reopen.
1840        let resp = process_command(
1841            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
1842            &mut loaded,
1843        )
1844        .expect("select via json_extract after reopen");
1845        assert!(resp.contains("1 row returned"), "got: {resp}");
1846
1847        cleanup(&path);
1848    }
1849
1850    #[test]
1851    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
1852        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
1853        // pages. After save+reopen the index entry reattaches with the
1854        // same column + same node count, loaded directly from disk
1855        // instead of re-walking rows.
1856        let path = tmp_path("hnsw_roundtrip");
1857
1858        // Build, populate, index, save.
1859        {
1860            let mut db = Database::new("test".to_string());
1861            process_command(
1862                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
1863                &mut db,
1864            )
1865            .unwrap();
1866            for v in &[
1867                "[1.0, 0.0]",
1868                "[2.0, 0.0]",
1869                "[0.0, 3.0]",
1870                "[1.0, 4.0]",
1871                "[10.0, 10.0]",
1872            ] {
1873                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
1874            }
1875            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1876            save_database(&mut db, &path).expect("save");
1877        } // db drops → exclusive lock releases.
1878
1879        // Reopen and verify the index reattached, with the same name +
1880        // column + populated graph.
1881        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1882        {
1883            let table = loaded.get_table("docs".to_string()).expect("docs");
1884            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1885            let entry = &table.hnsw_indexes[0];
1886            assert_eq!(entry.name, "ix_e");
1887            assert_eq!(entry.column_name, "e");
1888            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
1889            assert!(
1890                !entry.needs_rebuild,
1891                "fresh load should not be marked dirty"
1892            );
1893        }
1894
1895        // Quick functional check: KNN query through the loaded index
1896        // returns results.
1897        let resp = process_command(
1898            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
1899            &mut loaded,
1900        )
1901        .unwrap();
1902        assert!(resp.contains("3 rows returned"), "got: {resp}");
1903
1904        cleanup(&path);
1905    }
1906
1907    #[test]
1908    fn round_trip_rebuilds_fts_index_from_create_sql() {
1909        // Phase 8c: FTS indexes now persist their posting lists as
1910        // cell-encoded pages. After save+reopen the index entry
1911        // reattaches with the same column + same posting count, loaded
1912        // directly from disk (no re-tokenization).
1913        let path = tmp_path("fts_roundtrip");
1914
1915        {
1916            let mut db = Database::new("test".to_string());
1917            process_command(
1918                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1919                &mut db,
1920            )
1921            .unwrap();
1922            for body in &[
1923                "rust embedded database",
1924                "rust web framework",
1925                "go embedded systems",
1926                "python web framework",
1927                "rust rust embedded power",
1928            ] {
1929                process_command(
1930                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1931                    &mut db,
1932                )
1933                .unwrap();
1934            }
1935            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1936            save_database(&mut db, &path).expect("save");
1937        } // db drops → exclusive lock releases.
1938
1939        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1940        {
1941            let table = loaded.get_table("docs".to_string()).expect("docs");
1942            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
1943            let entry = &table.fts_indexes[0];
1944            assert_eq!(entry.name, "ix_body");
1945            assert_eq!(entry.column_name, "body");
1946            assert_eq!(
1947                entry.index.len(),
1948                5,
1949                "rebuilt posting list should hold all 5 rows"
1950            );
1951            assert!(!entry.needs_rebuild);
1952        }
1953
1954        // Functional smoke: an FTS query through the reloaded index
1955        // returns the expected hit count.
1956        let resp = process_command(
1957            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
1958            &mut loaded,
1959        )
1960        .unwrap();
1961        assert!(resp.contains("3 rows returned"), "got: {resp}");
1962
1963        cleanup(&path);
1964    }
1965
1966    #[test]
1967    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
1968        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
1969        // from current rows; reopen replays the CREATE INDEX SQL against
1970        // the post-delete row set. The deleted rowid must not surface
1971        // in `fts_match` results post-reopen.
1972        let path = tmp_path("fts_delete_rebuild");
1973        let mut db = Database::new("test".to_string());
1974        process_command(
1975            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1976            &mut db,
1977        )
1978        .unwrap();
1979        for body in &[
1980            "rust embedded",
1981            "rust framework",
1982            "go embedded",
1983            "python web",
1984        ] {
1985            process_command(
1986                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1987                &mut db,
1988            )
1989            .unwrap();
1990        }
1991        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1992
1993        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
1994        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1995        save_database(&mut db, &path).expect("save");
1996        drop(db);
1997
1998        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1999        let resp = process_command(
2000            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2001            &mut loaded,
2002        )
2003        .unwrap();
2004        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2005        // 'rust'. Post-delete: only id=2 remains.
2006        assert!(resp.contains("1 row returned"), "got: {resp}");
2007
2008        cleanup(&path);
2009    }
2010
2011    #[test]
2012    fn fts_roundtrip_uses_persistence_path_not_replay() {
2013        // Phase 8c — assert the reload didn't go through the
2014        // rootpage=0 replay shortcut. We do this by reading the
2015        // sqlrite_master row for the FTS index and confirming its
2016        // rootpage field is non-zero.
2017        let path = tmp_path("fts_persistence_path");
2018
2019        {
2020            let mut db = Database::new("test".to_string());
2021            process_command(
2022                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2023                &mut db,
2024            )
2025            .unwrap();
2026            process_command(
2027                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2028                &mut db,
2029            )
2030            .unwrap();
2031            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2032            save_database(&mut db, &path).expect("save");
2033        }
2034
2035        // Read raw sqlrite_master to find the FTS index row.
2036        let pager = Pager::open(&path).expect("open pager");
2037        let mut master = build_empty_master_table();
2038        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2039        let mut found_rootpage: Option<u32> = None;
2040        for rowid in master.rowids() {
2041            let name = take_text(&master, "name", rowid).unwrap();
2042            if name == "ix_body" {
2043                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2044                found_rootpage = Some(rp as u32);
2045            }
2046        }
2047        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2048        assert!(
2049            rootpage != 0,
2050            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2051        );
2052
2053        cleanup(&path);
2054    }
2055
2056    #[test]
2057    fn save_without_fts_keeps_format_v4() {
2058        // Phase 8c on-demand bump — a database with zero FTS indexes
2059        // continues writing the v4 header. Existing v4 users must not
2060        // see their files silently promoted to v5 by an upgrade.
2061        use crate::sql::pager::header::FORMAT_VERSION_V4;
2062
2063        let path = tmp_path("fts_no_bump");
2064        let mut db = Database::new("test".to_string());
2065        process_command(
2066            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2067            &mut db,
2068        )
2069        .unwrap();
2070        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2071        save_database(&mut db, &path).unwrap();
2072        drop(db);
2073
2074        let pager = Pager::open(&path).expect("open");
2075        assert_eq!(
2076            pager.header().format_version,
2077            FORMAT_VERSION_V4,
2078            "no-FTS save should keep v4"
2079        );
2080        cleanup(&path);
2081    }
2082
2083    #[test]
2084    fn save_with_fts_bumps_to_v5() {
2085        // Phase 8c on-demand bump — first FTS-bearing save promotes
2086        // the file to v5. v5 readers handle both v4 and v5; v4
2087        // readers correctly refuse a v5 file.
2088        use crate::sql::pager::header::FORMAT_VERSION_V5;
2089
2090        let path = tmp_path("fts_bump_v5");
2091        let mut db = Database::new("test".to_string());
2092        process_command(
2093            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2094            &mut db,
2095        )
2096        .unwrap();
2097        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2098        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2099        save_database(&mut db, &path).unwrap();
2100        drop(db);
2101
2102        let pager = Pager::open(&path).expect("open");
2103        assert_eq!(
2104            pager.header().format_version,
2105            FORMAT_VERSION_V5,
2106            "FTS save should promote to v5"
2107        );
2108        cleanup(&path);
2109    }
2110
2111    #[test]
2112    fn fts_persistence_handles_empty_and_zero_token_docs() {
2113        // Phase 8c — sidecar cell carries doc-lengths for every doc
2114        // including any with zero tokens (so total_docs is honest
2115        // post-reopen). Empty index also round-trips: a CREATE INDEX
2116        // on an empty table emits a single empty leaf with just the
2117        // (empty) sidecar.
2118        let path = tmp_path("fts_edges");
2119
2120        {
2121            let mut db = Database::new("test".to_string());
2122            process_command(
2123                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2124                &mut db,
2125            )
2126            .unwrap();
2127            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2128            // Mix: real text, then a row that tokenizes to zero tokens
2129            // (only punctuation), then real again.
2130            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2131            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2132            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2133            save_database(&mut db, &path).unwrap();
2134        }
2135
2136        let loaded = open_database(&path, "test".to_string()).expect("open");
2137        let table = loaded.get_table("docs".to_string()).unwrap();
2138        let entry = &table.fts_indexes[0];
2139        // All three rows present — including the zero-token row,
2140        // which is critical for total_docs honesty in BM25.
2141        assert_eq!(entry.index.len(), 3);
2142        // 'embedded' appears in 2 rows after reload.
2143        let res = entry
2144            .index
2145            .query("embedded", &crate::sql::fts::Bm25Params::default());
2146        assert_eq!(res.len(), 2);
2147
2148        cleanup(&path);
2149    }
2150
2151    #[test]
2152    fn fts_persistence_round_trips_large_corpus() {
2153        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2154        // single-token bodies generates enough cells to overflow a
2155        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2156        let path = tmp_path("fts_large_corpus");
2157
2158        let mut expected_terms: std::collections::BTreeSet<String> =
2159            std::collections::BTreeSet::new();
2160        {
2161            let mut db = Database::new("test".to_string());
2162            process_command(
2163                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2164                &mut db,
2165            )
2166            .unwrap();
2167            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2168            // 500 docs, each one a unique term — drives unique-term
2169            // count up so multiple leaves are required.
2170            for i in 0..500 {
2171                let term = format!("term{i:04}");
2172                process_command(
2173                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2174                    &mut db,
2175                )
2176                .unwrap();
2177                expected_terms.insert(term);
2178            }
2179            save_database(&mut db, &path).unwrap();
2180        }
2181
2182        let loaded = open_database(&path, "test".to_string()).expect("open");
2183        let table = loaded.get_table("docs".to_string()).unwrap();
2184        let entry = &table.fts_indexes[0];
2185        assert_eq!(entry.index.len(), 500);
2186
2187        // Spot-check a handful of terms come back with their original
2188        // single-row posting list.
2189        for &i in &[0_i64, 137, 248, 391, 499] {
2190            let term = format!("term{i:04}");
2191            let res = entry
2192                .index
2193                .query(&term, &crate::sql::fts::Bm25Params::default());
2194            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2195            // PrimaryKey rowids start at 1; doc i was inserted at
2196            // rowid i+1.
2197            assert_eq!(res[0].0, i + 1);
2198        }
2199
2200        cleanup(&path);
2201    }
2202
2203    #[test]
2204    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2205        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2206        // current rows + serializes; reopen loads the post-delete graph.
2207        // After all that, the deleted rowid must NOT come back from a
2208        // KNN query.
2209        let path = tmp_path("hnsw_delete_rebuild");
2210        let mut db = Database::new("test".to_string());
2211        process_command(
2212            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2213            &mut db,
2214        )
2215        .unwrap();
2216        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2217            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2218        }
2219        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2220
2221        // Delete row 1 (the closest match to [0.5, 0.0]).
2222        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2223        // Confirm it marked dirty.
2224        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2225        assert!(dirty_before_save, "DELETE should mark dirty");
2226
2227        save_database(&mut db, &path).expect("save");
2228        // Confirm save cleared the dirty flag.
2229        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2230        assert!(!dirty_after_save, "save should clear dirty");
2231        drop(db);
2232
2233        // Reopen, query for the closest match. Row 1 is gone; row 2
2234        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2235        let loaded = open_database(&path, "test".to_string()).expect("open");
2236        let docs = loaded.get_table("docs".to_string()).expect("docs");
2237
2238        // Row 1 must not appear in any storage anymore.
2239        assert!(
2240            !docs.rowids().contains(&1),
2241            "deleted row 1 should not be in row storage"
2242        );
2243        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2244
2245        // The HNSW index must also have shed the deleted node.
2246        assert_eq!(
2247            docs.hnsw_indexes[0].index.len(),
2248            3,
2249            "HNSW graph should have shed the deleted node"
2250        );
2251
2252        cleanup(&path);
2253    }
2254
2255    #[test]
2256    fn round_trip_survives_writes_after_load() {
2257        let path = tmp_path("after_load");
2258        save_database(&mut seed_db(), &path).unwrap();
2259
2260        {
2261            let mut db = open_database(&path, "test".to_string()).unwrap();
2262            process_command(
2263                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2264                &mut db,
2265            )
2266            .unwrap();
2267            save_database(&mut db, &path).unwrap();
2268        } // db drops → its exclusive lock releases before we reopen below.
2269
2270        let db2 = open_database(&path, "test".to_string()).unwrap();
2271        let users = db2.get_table("users".to_string()).unwrap();
2272        assert_eq!(users.rowids().len(), 3);
2273
2274        cleanup(&path);
2275    }
2276
2277    #[test]
2278    fn open_rejects_garbage_file() {
2279        let path = tmp_path("bad");
2280        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2281        let result = open_database(&path, "x".to_string());
2282        assert!(result.is_err());
2283        cleanup(&path);
2284    }
2285
2286    #[test]
2287    fn many_small_rows_spread_across_leaves() {
2288        let path = tmp_path("many_rows");
2289        let mut db = Database::new("big".to_string());
2290        process_command(
2291            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2292            &mut db,
2293        )
2294        .unwrap();
2295        for i in 0..200 {
2296            let body = "x".repeat(200);
2297            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2298            process_command(&q, &mut db).unwrap();
2299        }
2300        save_database(&mut db, &path).unwrap();
2301        let loaded = open_database(&path, "big".to_string()).unwrap();
2302        let things = loaded.get_table("things".to_string()).unwrap();
2303        assert_eq!(things.rowids().len(), 200);
2304        cleanup(&path);
2305    }
2306
2307    #[test]
2308    fn huge_row_goes_through_overflow() {
2309        let path = tmp_path("overflow_row");
2310        let mut db = Database::new("big".to_string());
2311        process_command(
2312            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2313            &mut db,
2314        )
2315        .unwrap();
2316        let body = "A".repeat(10_000);
2317        process_command(
2318            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2319            &mut db,
2320        )
2321        .unwrap();
2322        save_database(&mut db, &path).unwrap();
2323
2324        let loaded = open_database(&path, "big".to_string()).unwrap();
2325        let docs = loaded.get_table("docs".to_string()).unwrap();
2326        let rowids = docs.rowids();
2327        assert_eq!(rowids.len(), 1);
2328        let stored = docs.get_value("body", rowids[0]);
2329        match stored {
2330            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2331            other => panic!("expected Text, got {other:?}"),
2332        }
2333        cleanup(&path);
2334    }
2335
2336    #[test]
2337    fn create_sql_synthesis_round_trips() {
2338        // Build a table via CREATE, then verify table_to_create_sql +
2339        // parse_create_sql reproduce an equivalent column list.
2340        let mut db = Database::new("x".to_string());
2341        process_command(
2342            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2343            &mut db,
2344        )
2345        .unwrap();
2346        let t = db.get_table("t".to_string()).unwrap();
2347        let sql = table_to_create_sql(t);
2348        let (name, cols) = parse_create_sql(&sql).unwrap();
2349        assert_eq!(name, "t");
2350        assert_eq!(cols.len(), 3);
2351        assert!(cols[0].is_pk);
2352        assert!(cols[1].is_unique);
2353        assert!(cols[2].not_null);
2354    }
2355
2356    #[test]
2357    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2358        // After open, the public db.tables map should not list the master.
2359        let path = tmp_path("no_master");
2360        save_database(&mut seed_db(), &path).unwrap();
2361        let loaded = open_database(&path, "x".to_string()).unwrap();
2362        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2363        cleanup(&path);
2364    }
2365
2366    #[test]
2367    fn multi_leaf_table_produces_an_interior_root() {
2368        // 200 fat rows force the table into multiple leaves, which means
2369        // save_database must build at least one InteriorNode above them.
2370        // The test verifies the round-trip works and confirms the root is
2371        // indeed an interior page (not a leaf) by reading the page type
2372        // directly out of the open pager.
2373        let path = tmp_path("multi_leaf_interior");
2374        let mut db = Database::new("big".to_string());
2375        process_command(
2376            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2377            &mut db,
2378        )
2379        .unwrap();
2380        for i in 0..200 {
2381            let body = "x".repeat(200);
2382            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2383            process_command(&q, &mut db).unwrap();
2384        }
2385        save_database(&mut db, &path).unwrap();
2386
2387        // Confirm the round-trip preserved all 200 rows.
2388        let loaded = open_database(&path, "big".to_string()).unwrap();
2389        let things = loaded.get_table("things".to_string()).unwrap();
2390        assert_eq!(things.rowids().len(), 200);
2391
2392        // Peek at `things`'s root page via the pager attached to the
2393        // loaded DB and check it's an InteriorNode, not a leaf.
2394        let pager = loaded
2395            .pager
2396            .as_ref()
2397            .expect("loaded DB should have a pager");
2398        // sqlrite_master's row for `things` holds its root page. Easiest
2399        // way to find it: walk the leaf chain by using find_leftmost_leaf
2400        // and then hop one level up. Simpler: read the master, scan for
2401        // the "things" row, look up rootpage.
2402        let mut master = build_empty_master_table();
2403        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2404        let things_root = master
2405            .rowids()
2406            .into_iter()
2407            .find_map(|r| match master.get_value("name", r) {
2408                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2409                    Some(Value::Integer(p)) => Some(p as u32),
2410                    _ => None,
2411                },
2412                _ => None,
2413            })
2414            .expect("things should appear in sqlrite_master");
2415        let root_buf = pager.read_page(things_root).unwrap();
2416        assert_eq!(
2417            root_buf[0],
2418            PageType::InteriorNode as u8,
2419            "expected a multi-leaf table to have an interior root, got tag {}",
2420            root_buf[0]
2421        );
2422
2423        cleanup(&path);
2424    }
2425
2426    #[test]
2427    fn explicit_index_persists_across_save_and_open() {
2428        let path = tmp_path("idx_persist");
2429        let mut db = Database::new("idx".to_string());
2430        process_command(
2431            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2432            &mut db,
2433        )
2434        .unwrap();
2435        for i in 1..=5 {
2436            let tag = if i % 2 == 0 { "odd" } else { "even" };
2437            process_command(
2438                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2439                &mut db,
2440            )
2441            .unwrap();
2442        }
2443        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2444        save_database(&mut db, &path).unwrap();
2445
2446        let loaded = open_database(&path, "idx".to_string()).unwrap();
2447        let users = loaded.get_table("users".to_string()).unwrap();
2448        let idx = users
2449            .index_by_name("users_tag_idx")
2450            .expect("explicit index should survive save/open");
2451        assert_eq!(idx.column_name, "tag");
2452        assert!(!idx.is_unique);
2453        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2454        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2455        let even_rowids = idx.lookup(&Value::Text("even".into()));
2456        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2457        assert_eq!(even_rowids.len(), 3);
2458        assert_eq!(odd_rowids.len(), 2);
2459
2460        cleanup(&path);
2461    }
2462
2463    #[test]
2464    fn auto_indexes_for_unique_columns_survive_save_open() {
2465        let path = tmp_path("auto_idx_persist");
2466        let mut db = Database::new("a".to_string());
2467        process_command(
2468            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2469            &mut db,
2470        )
2471        .unwrap();
2472        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2473        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2474        save_database(&mut db, &path).unwrap();
2475
2476        let loaded = open_database(&path, "a".to_string()).unwrap();
2477        let users = loaded.get_table("users".to_string()).unwrap();
2478        // Every UNIQUE column auto-creates an index; the load path populated
2479        // it from the persisted entries.
2480        let auto_name = SecondaryIndex::auto_name("users", "email");
2481        let idx = users
2482            .index_by_name(&auto_name)
2483            .expect("auto index should be restored");
2484        assert!(idx.is_unique);
2485        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2486        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2487
2488        cleanup(&path);
2489    }
2490
2491    #[test]
2492    fn deep_tree_round_trips() {
2493        // Force a 3-level tree by bypassing process_command (which prints
2494        // the full table on every INSERT, making large bulk loads O(N^2)
2495        // in I/O). We build the Table directly via restore_row.
2496        use crate::sql::db::table::Column as TableColumn;
2497
2498        let path = tmp_path("deep_tree");
2499        let mut db = Database::new("deep".to_string());
2500        let columns = vec![
2501            TableColumn::new("id".into(), "integer".into(), true, true, true),
2502            TableColumn::new("s".into(), "text".into(), false, true, false),
2503        ];
2504        let mut table = build_empty_table("t", columns, 0);
2505        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
2506        // which with interior fanout ~400 needs 2 interior levels (3-level
2507        // tree total, counting leaves).
2508        for i in 1..=6_000i64 {
2509            let body = "q".repeat(900);
2510            table
2511                .restore_row(
2512                    i,
2513                    vec![
2514                        Some(Value::Integer(i)),
2515                        Some(Value::Text(format!("r-{i}-{body}"))),
2516                    ],
2517                )
2518                .unwrap();
2519        }
2520        db.tables.insert("t".to_string(), table);
2521        save_database(&mut db, &path).unwrap();
2522
2523        let loaded = open_database(&path, "deep".to_string()).unwrap();
2524        let t = loaded.get_table("t".to_string()).unwrap();
2525        assert_eq!(t.rowids().len(), 6_000);
2526
2527        // Confirm the tree actually grew past 2 levels — i.e., the root's
2528        // leftmost child is itself an interior page, not a leaf.
2529        let pager = loaded.pager.as_ref().unwrap();
2530        let mut master = build_empty_master_table();
2531        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2532        let t_root = master
2533            .rowids()
2534            .into_iter()
2535            .find_map(|r| match master.get_value("name", r) {
2536                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
2537                    Some(Value::Integer(p)) => Some(p as u32),
2538                    _ => None,
2539                },
2540                _ => None,
2541            })
2542            .expect("t in sqlrite_master");
2543        let root_buf = pager.read_page(t_root).unwrap();
2544        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
2545        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
2546            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
2547        let root_interior = InteriorPage::from_bytes(root_payload);
2548        let child = root_interior.leftmost_child().unwrap();
2549        let child_buf = pager.read_page(child).unwrap();
2550        assert_eq!(
2551            child_buf[0],
2552            PageType::InteriorNode as u8,
2553            "expected 3-level tree: root's leftmost child should also be InteriorNode",
2554        );
2555
2556        cleanup(&path);
2557    }
2558
2559    #[test]
2560    fn alter_rename_table_survives_save_and_reopen() {
2561        let path = tmp_path("alter_rename_table_roundtrip");
2562        let mut db = seed_db();
2563        save_database(&mut db, &path).expect("save");
2564
2565        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
2566        save_database(&mut db, &path).expect("save after rename");
2567
2568        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2569        assert!(!loaded.contains_table("users".to_string()));
2570        assert!(loaded.contains_table("members".to_string()));
2571        let members = loaded.get_table("members".to_string()).unwrap();
2572        assert_eq!(members.rowids().len(), 2, "rows should survive");
2573        // Auto-indexes followed the rename.
2574        assert!(
2575            members
2576                .index_by_name("sqlrite_autoindex_members_id")
2577                .is_some()
2578        );
2579        assert!(
2580            members
2581                .index_by_name("sqlrite_autoindex_members_name")
2582                .is_some()
2583        );
2584
2585        cleanup(&path);
2586    }
2587
2588    #[test]
2589    fn alter_rename_column_survives_save_and_reopen() {
2590        let path = tmp_path("alter_rename_col_roundtrip");
2591        let mut db = seed_db();
2592        save_database(&mut db, &path).expect("save");
2593
2594        process_command(
2595            "ALTER TABLE users RENAME COLUMN name TO full_name;",
2596            &mut db,
2597        )
2598        .expect("rename column");
2599        save_database(&mut db, &path).expect("save after rename");
2600
2601        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2602        let users = loaded.get_table("users".to_string()).unwrap();
2603        assert!(users.contains_column("full_name".to_string()));
2604        assert!(!users.contains_column("name".to_string()));
2605        // Verify a row's value survived the rename round-trip.
2606        let alice_rowid = users
2607            .rowids()
2608            .into_iter()
2609            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
2610            .expect("alice row should be findable under renamed column");
2611        assert_eq!(
2612            users.get_value("full_name", alice_rowid),
2613            Some(Value::Text("alice".to_string()))
2614        );
2615
2616        cleanup(&path);
2617    }
2618
2619    #[test]
2620    fn alter_add_column_with_default_survives_save_and_reopen() {
2621        let path = tmp_path("alter_add_default_roundtrip");
2622        let mut db = seed_db();
2623        save_database(&mut db, &path).expect("save");
2624
2625        process_command(
2626            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
2627            &mut db,
2628        )
2629        .expect("add column");
2630        save_database(&mut db, &path).expect("save after add");
2631
2632        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2633        let users = loaded.get_table("users".to_string()).unwrap();
2634        assert!(users.contains_column("status".to_string()));
2635        for rowid in users.rowids() {
2636            assert_eq!(
2637                users.get_value("status", rowid),
2638                Some(Value::Text("active".to_string())),
2639                "backfilled default should round-trip for rowid {rowid}"
2640            );
2641        }
2642        // The DEFAULT clause itself should still be on the column metadata
2643        // so a subsequent INSERT picks it up.
2644        let status_col = users
2645            .columns
2646            .iter()
2647            .find(|c| c.column_name == "status")
2648            .unwrap();
2649        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
2650
2651        cleanup(&path);
2652    }
2653
2654    #[test]
2655    fn alter_drop_column_survives_save_and_reopen() {
2656        let path = tmp_path("alter_drop_col_roundtrip");
2657        let mut db = seed_db();
2658        save_database(&mut db, &path).expect("save");
2659
2660        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
2661        save_database(&mut db, &path).expect("save after drop");
2662
2663        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2664        let users = loaded.get_table("users".to_string()).unwrap();
2665        assert!(!users.contains_column("age".to_string()));
2666        assert!(users.contains_column("name".to_string()));
2667
2668        cleanup(&path);
2669    }
2670
2671    #[test]
2672    fn drop_table_survives_save_and_reopen() {
2673        let path = tmp_path("drop_table_roundtrip");
2674        let mut db = seed_db();
2675        save_database(&mut db, &path).expect("save");
2676
2677        // Verify both tables landed.
2678        {
2679            let loaded = open_database(&path, "t".to_string()).expect("open");
2680            assert!(loaded.contains_table("users".to_string()));
2681            assert!(loaded.contains_table("notes".to_string()));
2682        }
2683
2684        process_command("DROP TABLE users;", &mut db).expect("drop users");
2685        save_database(&mut db, &path).expect("save after drop");
2686
2687        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2688        assert!(
2689            !loaded.contains_table("users".to_string()),
2690            "dropped table should not resurface on reopen"
2691        );
2692        assert!(
2693            loaded.contains_table("notes".to_string()),
2694            "untouched table should survive"
2695        );
2696
2697        cleanup(&path);
2698    }
2699
2700    #[test]
2701    fn drop_index_survives_save_and_reopen() {
2702        let path = tmp_path("drop_index_roundtrip");
2703        let mut db = Database::new("t".to_string());
2704        process_command(
2705            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
2706            &mut db,
2707        )
2708        .unwrap();
2709        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
2710        save_database(&mut db, &path).expect("save");
2711
2712        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
2713        save_database(&mut db, &path).expect("save after drop");
2714
2715        let loaded = open_database(&path, "t".to_string()).expect("reopen");
2716        let notes = loaded.get_table("notes".to_string()).unwrap();
2717        assert!(
2718            notes.index_by_name("notes_body_idx").is_none(),
2719            "dropped index should not resurface on reopen"
2720        );
2721        // The auto-index for the PK should still be there.
2722        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
2723
2724        cleanup(&path);
2725    }
2726
2727    #[test]
2728    fn default_clause_survives_save_and_reopen() {
2729        let path = tmp_path("default_roundtrip");
2730        let mut db = Database::new("t".to_string());
2731
2732        process_command(
2733            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
2734            &mut db,
2735        )
2736        .unwrap();
2737        save_database(&mut db, &path).expect("save");
2738
2739        let mut loaded = open_database(&path, "t".to_string()).expect("open");
2740
2741        // The reloaded column metadata should still carry the DEFAULT.
2742        let users = loaded.get_table("users".to_string()).expect("users table");
2743        let status_col = users
2744            .columns
2745            .iter()
2746            .find(|c| c.column_name == "status")
2747            .expect("status column");
2748        assert_eq!(
2749            status_col.default,
2750            Some(Value::Text("active".to_string())),
2751            "DEFAULT 'active' should round-trip"
2752        );
2753        let score_col = users
2754            .columns
2755            .iter()
2756            .find(|c| c.column_name == "score")
2757            .expect("score column");
2758        assert_eq!(
2759            score_col.default,
2760            Some(Value::Integer(0)),
2761            "DEFAULT 0 should round-trip"
2762        );
2763
2764        // Now exercise the runtime path: an INSERT that omits both DEFAULT
2765        // columns should pick them up from the reloaded schema.
2766        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
2767        let users = loaded.get_table("users".to_string()).unwrap();
2768        assert_eq!(
2769            users.get_value("status", 1),
2770            Some(Value::Text("active".to_string()))
2771        );
2772        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
2773
2774        cleanup(&path);
2775    }
2776}