Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod cell;
34pub mod file;
35#[allow(dead_code)]
36pub mod fts_cell;
37pub mod header;
38#[allow(dead_code)]
39pub mod hnsw_cell;
40#[allow(dead_code)]
41pub mod index_cell;
42#[allow(dead_code)]
43pub mod interior_page;
44pub mod overflow;
45pub mod page;
46pub mod pager;
47#[allow(dead_code)]
48pub mod table_page;
49#[allow(dead_code)]
50pub mod varint;
51#[allow(dead_code)]
52pub mod wal;
53
54use std::collections::{BTreeMap, HashMap};
55use std::path::Path;
56use std::sync::{Arc, Mutex};
57
58use sqlparser::dialect::SQLiteDialect;
59use sqlparser::parser::Parser;
60
61use crate::error::{Result, SQLRiteError};
62use crate::sql::db::database::Database;
63use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
64use crate::sql::db::table::{Column, DataType, Row, Table, Value};
65use crate::sql::pager::cell::Cell;
66use crate::sql::pager::header::DbHeader;
67use crate::sql::pager::index_cell::IndexCell;
68use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
69use crate::sql::pager::overflow::{
70    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
71};
72use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
73use crate::sql::pager::pager::Pager;
74use crate::sql::pager::table_page::TablePage;
75use crate::sql::parser::create::CreateQuery;
76
77// Re-export so callers can spell `sql::pager::AccessMode` without
78// reaching into the `pager::pager::pager` submodule path.
79pub use crate::sql::pager::pager::AccessMode;
80
81/// Name of the internal catalog table. Reserved — user CREATEs of this
82/// name must be rejected upstream.
83pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
84
85/// Opens a database file in read-write mode. Shorthand for
86/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
87pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
88    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
89}
90
91/// Opens a database file in read-only mode. Acquires a shared OS-level
92/// advisory lock, so other read-only openers coexist but any writer is
93/// excluded. Attempts to mutate the returned `Database` (e.g. an
94/// `INSERT`, or a `save_database` call against it) bottom out in a
95/// `cannot commit: database is opened read-only` error from the Pager.
96pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
97    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
98}
99
100/// Opens a database file and reconstructs the in-memory `Database`,
101/// leaving the long-lived `Pager` attached for subsequent auto-save
102/// (read-write) or consistent-snapshot reads (read-only).
103pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
104    let pager = Pager::open_with_mode(path, mode)?;
105
106    // 1. Load sqlrite_master from the tree at header.schema_root_page.
107    let mut master = build_empty_master_table();
108    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
109
110    // 2. Two passes over master rows: first build every user table, then
111    //    attach secondary indexes. Indexes need their base table to exist
112    //    before we can populate them. Auto-indexes are created at table
113    //    build time so we only have to load explicit indexes from disk
114    //    (but we also reload the auto-index CONTENT because Table::new
115    //    built it empty).
116    let mut db = Database::new(db_name);
117    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
118
119    for rowid in master.rowids() {
120        let ty = take_text(&master, "type", rowid)?;
121        let name = take_text(&master, "name", rowid)?;
122        let sql = take_text(&master, "sql", rowid)?;
123        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
124        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
125
126        match ty.as_str() {
127            "table" => {
128                let (parsed_name, columns) = parse_create_sql(&sql)?;
129                if parsed_name != name {
130                    return Err(SQLRiteError::Internal(format!(
131                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
132                    )));
133                }
134                let mut table = build_empty_table(&name, columns, last_rowid);
135                if rootpage != 0 {
136                    load_table_rows(&pager, &mut table, rootpage)?;
137                }
138                if last_rowid > table.last_rowid {
139                    table.last_rowid = last_rowid;
140                }
141                db.tables.insert(name, table);
142            }
143            "index" => {
144                index_rows.push(IndexCatalogRow {
145                    name,
146                    sql,
147                    rootpage,
148                });
149            }
150            other => {
151                return Err(SQLRiteError::Internal(format!(
152                    "sqlrite_master row '{name}' has unknown type '{other}'"
153                )));
154            }
155        }
156    }
157
158    // Second pass: attach each index to its table. HNSW indexes
159    // (Phase 7d.2) take a different code path because their persisted
160    // form is just the CREATE INDEX SQL — the graph itself isn't
161    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
162    // and route to a graph-rebuild instead of the B-Tree-cell load.
163    //
164    // Phase 8b — same shape for FTS indexes. The posting lists aren't
165    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
166    // open and let `execute_create_index` walk current rows.
167    for row in index_rows {
168        if create_index_sql_uses_hnsw(&row.sql) {
169            rebuild_hnsw_index(&mut db, &pager, &row)?;
170        } else if create_index_sql_uses_fts(&row.sql) {
171            rebuild_fts_index(&mut db, &pager, &row)?;
172        } else {
173            attach_index(&mut db, &pager, row)?;
174        }
175    }
176
177    db.source_path = Some(path.to_path_buf());
178    db.pager = Some(pager);
179    Ok(db)
180}
181
182/// Catalog row for a secondary index — deferred until after every table is
183/// loaded so the index's base table exists by the time we populate it.
184struct IndexCatalogRow {
185    name: String,
186    sql: String,
187    rootpage: u32,
188}
189
190/// Persists `db` to disk. Same diff-commit behavior as before: only pages
191/// whose bytes actually changed get written.
192pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
193    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
194    // marked dirty. Done up front under the &mut Database borrow we
195    // already hold, before the immutable iteration loops below need
196    // their own borrow.
197    rebuild_dirty_hnsw_indexes(db);
198    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
199    rebuild_dirty_fts_indexes(db);
200
201    let same_path = db.source_path.as_deref() == Some(path);
202    let mut pager = if same_path {
203        match db.pager.take() {
204            Some(p) => p,
205            None if path.exists() => Pager::open(path)?,
206            None => Pager::create(path)?,
207        }
208    } else if path.exists() {
209        Pager::open(path)?
210    } else {
211        Pager::create(path)?
212    };
213
214    pager.clear_staged();
215
216    // Page 0 is the header; payload pages start at 1.
217    let mut next_free_page: u32 = 1;
218
219    // 1. Stage each user table's B-Tree, collecting master-row info.
220    //    `kind` is "table" or "index" — master has one row per each.
221    let mut master_rows: Vec<CatalogEntry> = Vec::new();
222
223    let mut table_names: Vec<&String> = db.tables.keys().collect();
224    table_names.sort();
225    for name in table_names {
226        if name == MASTER_TABLE_NAME {
227            return Err(SQLRiteError::Internal(format!(
228                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
229            )));
230        }
231        let table = &db.tables[name];
232        let (rootpage, new_next) = stage_table_btree(&mut pager, table, next_free_page)?;
233        next_free_page = new_next;
234        master_rows.push(CatalogEntry {
235            kind: "table".into(),
236            name: name.clone(),
237            sql: table_to_create_sql(table),
238            rootpage,
239            last_rowid: table.last_rowid,
240        });
241    }
242
243    // 2. Stage each secondary index's B-Tree. Indexes persist in a
244    //    deterministic order: sorted by (owning_table, index_name).
245    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
246    for table in db.tables.values() {
247        for idx in &table.secondary_indexes {
248            index_entries.push((table, idx));
249        }
250    }
251    index_entries
252        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
253    for (_table, idx) in index_entries {
254        let (rootpage, new_next) = stage_index_btree(&mut pager, idx, next_free_page)?;
255        next_free_page = new_next;
256        master_rows.push(CatalogEntry {
257            kind: "index".into(),
258            name: idx.name.clone(),
259            sql: idx.synthesized_sql(),
260            rootpage,
261            last_rowid: 0,
262        });
263    }
264
265    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
266    //     page trees, with the rootpage recorded in sqlrite_master.
267    //     Reopen loads the graph back from cells (fast, exact match)
268    //     instead of rebuilding from rows.
269    //
270    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
271    //     rebuilt from current rows BEFORE staging, so the on-disk
272    //     graph reflects the current row set.
273    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
274    for table in db.tables.values() {
275        for entry in &table.hnsw_indexes {
276            hnsw_entries.push((table, entry));
277        }
278    }
279    hnsw_entries
280        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
281    for (table, entry) in hnsw_entries {
282        let (rootpage, new_next) = stage_hnsw_btree(&mut pager, &entry.index, next_free_page)?;
283        next_free_page = new_next;
284        master_rows.push(CatalogEntry {
285            kind: "index".into(),
286            name: entry.name.clone(),
287            sql: format!(
288                "CREATE INDEX {} ON {} USING hnsw ({})",
289                entry.name, table.tb_name, entry.column_name
290            ),
291            rootpage,
292            last_rowid: 0,
293        });
294    }
295
296    // 2c. Phase 8c — persist FTS posting lists as their own
297    //     cell-encoded page trees, with the rootpage recorded in
298    //     sqlrite_master. Reopen loads the postings back from cells
299    //     (fast, exact match) instead of re-tokenizing rows.
300    //
301    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
302    //     rebuilt from current rows BEFORE staging by
303    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
304    //     the current row set.
305    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
306    for table in db.tables.values() {
307        for entry in &table.fts_indexes {
308            fts_entries.push((table, entry));
309        }
310    }
311    fts_entries
312        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
313    let any_fts = !fts_entries.is_empty();
314    for (table, entry) in fts_entries {
315        let (rootpage, new_next) = stage_fts_btree(&mut pager, &entry.index, next_free_page)?;
316        next_free_page = new_next;
317        master_rows.push(CatalogEntry {
318            kind: "index".into(),
319            name: entry.name.clone(),
320            sql: format!(
321                "CREATE INDEX {} ON {} USING fts ({})",
322                entry.name, table.tb_name, entry.column_name
323            ),
324            rootpage,
325            last_rowid: 0,
326        });
327    }
328
329    // 3. Build an in-memory sqlrite_master with one row per table or index,
330    //    then stage it via the same tree-build path.
331    let mut master = build_empty_master_table();
332    for (i, entry) in master_rows.into_iter().enumerate() {
333        let rowid = (i as i64) + 1;
334        master.restore_row(
335            rowid,
336            vec![
337                Some(Value::Text(entry.kind)),
338                Some(Value::Text(entry.name)),
339                Some(Value::Text(entry.sql)),
340                Some(Value::Integer(entry.rootpage as i64)),
341                Some(Value::Integer(entry.last_rowid)),
342            ],
343        )?;
344    }
345    let (master_root, master_next) = stage_table_btree(&mut pager, &master, next_free_page)?;
346    next_free_page = master_next;
347
348    // Phase 8c — on-demand v4→v5 file-format bump per Q10. If any FTS
349    // index attached to the database, write v5; otherwise preserve the
350    // pre-existing version (v4 for files born before this build, or
351    // a previously-promoted v5 file). Reads accept both.
352    let format_version = if any_fts {
353        crate::sql::pager::header::FORMAT_VERSION_V5
354    } else {
355        pager.header().format_version
356    };
357
358    pager.commit(DbHeader {
359        page_count: next_free_page,
360        schema_root_page: master_root,
361        format_version,
362    })?;
363
364    if same_path {
365        db.pager = Some(pager);
366    }
367    Ok(())
368}
369
370/// Build material for a single row in sqlrite_master.
371struct CatalogEntry {
372    kind: String, // "table" or "index"
373    name: String,
374    sql: String,
375    rootpage: u32,
376    last_rowid: i64,
377}
378
379// -------------------------------------------------------------------------
380// sqlrite_master — hardcoded catalog table schema
381
382fn build_empty_master_table() -> Table {
383    // Phase 3e: `type` is the first column, matching SQLite's convention.
384    // It distinguishes `'table'` rows from `'index'` rows.
385    let columns = vec![
386        Column::new("type".into(), "text".into(), false, true, false),
387        Column::new("name".into(), "text".into(), true, true, true),
388        Column::new("sql".into(), "text".into(), false, true, false),
389        Column::new("rootpage".into(), "integer".into(), false, true, false),
390        Column::new("last_rowid".into(), "integer".into(), false, true, false),
391    ];
392    build_empty_table(MASTER_TABLE_NAME, columns, 0)
393}
394
395/// Reads a required Text column from a known-good catalog row.
396fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
397    match table.get_value(col, rowid) {
398        Some(Value::Text(s)) => Ok(s),
399        other => Err(SQLRiteError::Internal(format!(
400            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
401        ))),
402    }
403}
404
405/// Reads a required Integer column from a known-good catalog row.
406fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
407    match table.get_value(col, rowid) {
408        Some(Value::Integer(v)) => Ok(v),
409        other => Err(SQLRiteError::Internal(format!(
410            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
411        ))),
412    }
413}
414
415// -------------------------------------------------------------------------
416// CREATE-TABLE SQL synthesis and re-parsing
417
418/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
419/// Deterministic: same schema → same SQL, so diffing commits stay stable.
420fn table_to_create_sql(table: &Table) -> String {
421    let mut parts = Vec::with_capacity(table.columns.len());
422    for c in &table.columns {
423        // Render the SQL type literally so the round-trip through
424        // CREATE TABLE re-parsing recreates the same schema. Vector
425        // carries its dimension inline.
426        let ty: String = match &c.datatype {
427            DataType::Integer => "INTEGER".to_string(),
428            DataType::Text => "TEXT".to_string(),
429            DataType::Real => "REAL".to_string(),
430            DataType::Bool => "BOOLEAN".to_string(),
431            DataType::Vector(dim) => format!("VECTOR({dim})"),
432            DataType::Json => "JSON".to_string(),
433            DataType::None | DataType::Invalid => "TEXT".to_string(),
434        };
435        let mut piece = format!("{} {}", c.column_name, ty);
436        if c.is_pk {
437            piece.push_str(" PRIMARY KEY");
438        } else {
439            if c.is_unique {
440                piece.push_str(" UNIQUE");
441            }
442            if c.not_null {
443                piece.push_str(" NOT NULL");
444            }
445        }
446        parts.push(piece);
447    }
448    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
449}
450
451/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
452/// and produces our internal column list. Returns `(table_name, columns)`.
453fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
454    let dialect = SQLiteDialect {};
455    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
456    let stmt = ast.pop().ok_or_else(|| {
457        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
458    })?;
459    let create = CreateQuery::new(&stmt)?;
460    let columns = create
461        .columns
462        .into_iter()
463        .map(|pc| Column::new(pc.name, pc.datatype, pc.is_pk, pc.not_null, pc.is_unique))
464        .collect();
465    Ok((create.table_name, columns))
466}
467
468// -------------------------------------------------------------------------
469// In-memory table (re)construction
470
471/// Builds an empty in-memory `Table` given the declared columns.
472fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
473    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
474    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
475    {
476        let mut map = rows.lock().expect("rows mutex poisoned");
477        for col in &columns {
478            // Mirror the dispatch in `Table::new` so the reconstructed
479            // table has the same shape it'd have if it were built fresh
480            // from SQL. Phase 7a adds the Vector arm — without it,
481            // VECTOR columns silently restore as Row::None and every
482            // restore_row hits a "storage None vs value Some(Vector(...))"
483            // type mismatch.
484            let row = match &col.datatype {
485                DataType::Integer => Row::Integer(BTreeMap::new()),
486                DataType::Text => Row::Text(BTreeMap::new()),
487                DataType::Real => Row::Real(BTreeMap::new()),
488                DataType::Bool => Row::Bool(BTreeMap::new()),
489                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
490                // JSON columns reuse Text storage — see Table::new and
491                // Phase 7e's scope-correction note.
492                DataType::Json => Row::Text(BTreeMap::new()),
493                DataType::None | DataType::Invalid => Row::None,
494            };
495            map.insert(col.column_name.clone(), row);
496
497            // Auto-create UNIQUE/PK indexes so the restored table has the
498            // same shape Table::new would have built from fresh SQL.
499            if (col.is_pk || col.is_unique)
500                && matches!(col.datatype, DataType::Integer | DataType::Text)
501            {
502                if let Ok(idx) = SecondaryIndex::new(
503                    SecondaryIndex::auto_name(name, &col.column_name),
504                    name.to_string(),
505                    col.column_name.clone(),
506                    &col.datatype,
507                    true,
508                    IndexOrigin::Auto,
509                ) {
510                    secondary_indexes.push(idx);
511                }
512            }
513        }
514    }
515
516    let primary_key = columns
517        .iter()
518        .find(|c| c.is_pk)
519        .map(|c| c.column_name.clone())
520        .unwrap_or_else(|| "-1".to_string());
521
522    Table {
523        tb_name: name.to_string(),
524        columns,
525        rows,
526        secondary_indexes,
527        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
528        // executing each `CREATE INDEX … USING hnsw` SQL stored in
529        // `sqlrite_master`. This builder produces the empty shell;
530        // `replay_create_index_for_hnsw` (in this same module) walks
531        // sqlrite_master after every table is loaded and rebuilds the
532        // graph from current row data. Persistence of the graph itself
533        // (avoiding the on-open rebuild cost) is Phase 7d.3.
534        hnsw_indexes: Vec::new(),
535        // FTS indexes (Phase 8b) follow the same pattern — the
536        // CREATE INDEX … USING fts SQL is the source of truth on open
537        // and the in-memory posting list gets rebuilt from current
538        // rows. Cell-encoded persistence of the postings is Phase 8c.
539        fts_indexes: Vec::new(),
540        last_rowid,
541        primary_key,
542    }
543}
544
545// -------------------------------------------------------------------------
546// Leaf-chain read / write
547
548/// Walks a table's B-Tree from `root_page`, following the leftmost-child
549/// chain down to the first leaf, then iterating leaves via their sibling
550/// `next_page` pointers. Every cell is decoded and replayed into `table`.
551///
552/// Open-path note: we eagerly materialize the entire table into `Table`'s
553/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
554/// on demand so queries can stream through the tree without a full upfront
555/// load.
556/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
557/// index on its base table by walking the tree of index cells at
558/// `rootpage`. The base table is expected to already be in `db.tables`.
559fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
560    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
561
562    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
563        SQLRiteError::Internal(format!(
564            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
565            row.name
566        ))
567    })?;
568    let datatype = table
569        .columns
570        .iter()
571        .find(|c| c.column_name == column_name)
572        .map(|c| clone_datatype(&c.datatype))
573        .ok_or_else(|| {
574            SQLRiteError::Internal(format!(
575                "index '{}' references unknown column '{column_name}' on '{table_name}'",
576                row.name
577            ))
578        })?;
579
580    // An auto-index on this column may already exist (built by
581    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
582    // the slot instead of adding a duplicate entry.
583    let existing_slot = table
584        .secondary_indexes
585        .iter()
586        .position(|i| i.name == row.name);
587    let idx = match existing_slot {
588        Some(i) => {
589            // Drain any entries that may have been populated during table
590            // restore_row calls — we're about to repopulate from the
591            // persisted tree.
592            table.secondary_indexes.remove(i)
593        }
594        None => SecondaryIndex::new(
595            row.name.clone(),
596            table_name.clone(),
597            column_name.clone(),
598            &datatype,
599            is_unique,
600            IndexOrigin::Explicit,
601        )?,
602    };
603    let mut idx = idx;
604    // Wipe any stale entries from the auto path so the load is idempotent.
605    let is_unique_flag = idx.is_unique;
606    let origin = idx.origin;
607    idx = SecondaryIndex::new(
608        idx.name,
609        idx.table_name,
610        idx.column_name,
611        &datatype,
612        is_unique_flag,
613        origin,
614    )?;
615
616    // Populate from the index tree's cells.
617    load_index_rows(pager, &mut idx, row.rootpage)?;
618
619    table.secondary_indexes.push(idx);
620    Ok(())
621}
622
623/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
624/// every `(value, rowid)` pair into `idx`.
625fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
626    if root_page == 0 {
627        return Ok(());
628    }
629    let first_leaf = find_leftmost_leaf(pager, root_page)?;
630    let mut current = first_leaf;
631    while current != 0 {
632        let page_buf = pager
633            .read_page(current)
634            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
635        if page_buf[0] != PageType::TableLeaf as u8 {
636            return Err(SQLRiteError::Internal(format!(
637                "page {current} tagged {} but expected TableLeaf (index)",
638                page_buf[0]
639            )));
640        }
641        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
642        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
643            .try_into()
644            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
645        let leaf = TablePage::from_bytes(payload);
646
647        for slot in 0..leaf.slot_count() {
648            // Slots on an index page hold KIND_INDEX cells; decode directly.
649            let offset = leaf.slot_offset_raw(slot)?;
650            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
651            idx.insert(&ic.value, ic.rowid)?;
652        }
653        current = next_leaf;
654    }
655    Ok(())
656}
657
658/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
659/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
660///
661/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
662/// still works; the only shape we accept is single-column indexes.
663fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
664    use sqlparser::ast::{CreateIndex, Expr, Statement};
665
666    let dialect = SQLiteDialect {};
667    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
668    let Some(Statement::CreateIndex(CreateIndex {
669        table_name,
670        columns,
671        unique,
672        ..
673    })) = ast.pop()
674    else {
675        return Err(SQLRiteError::Internal(format!(
676            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
677        )));
678    };
679    if columns.len() != 1 {
680        return Err(SQLRiteError::NotImplemented(
681            "multi-column indexes aren't supported yet".to_string(),
682        ));
683    }
684    let col = match &columns[0].column.expr {
685        Expr::Identifier(ident) => ident.value.clone(),
686        Expr::CompoundIdentifier(parts) => {
687            parts.last().map(|p| p.value.clone()).unwrap_or_default()
688        }
689        other => {
690            return Err(SQLRiteError::Internal(format!(
691                "unsupported indexed column expression: {other:?}"
692            )));
693        }
694    };
695    Ok((table_name.to_string(), col, unique))
696}
697
698/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
699/// Used by the open path to route HNSW indexes to the graph-rebuild path
700/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
701/// don't have a USING clause, so they all return false and continue
702/// taking the existing path.
703fn create_index_sql_uses_hnsw(sql: &str) -> bool {
704    use sqlparser::ast::{CreateIndex, IndexType, Statement};
705
706    let dialect = SQLiteDialect {};
707    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
708        return false;
709    };
710    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
711        return false;
712    };
713    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
714}
715
716/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
717/// Mirrors [`create_index_sql_uses_hnsw`].
718fn create_index_sql_uses_fts(sql: &str) -> bool {
719    use sqlparser::ast::{CreateIndex, IndexType, Statement};
720
721    let dialect = SQLiteDialect {};
722    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
723        return false;
724    };
725    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
726        return false;
727    };
728    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
729}
730
731/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
732/// paths mirror [`rebuild_hnsw_index`]:
733///
734///   - **rootpage != 0** (Phase 8c default): the posting list is
735///     persisted as cell-encoded pages. Read every cell directly via
736///     [`load_fts_postings`] and reconstruct the index — no
737///     re-tokenization, exact bit-for-bit reproduction.
738///
739///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
740///     for files saved by Phase 8b before persistence landed. Replay
741///     the CREATE INDEX SQL through `execute_create_index`, which
742///     walks the table's current rows and tokenizes them fresh.
743fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
744    use crate::sql::db::table::FtsIndexEntry;
745    use crate::sql::executor::execute_create_index;
746    use crate::sql::fts::PostingList;
747    use sqlparser::ast::Statement;
748
749    let dialect = SQLiteDialect {};
750    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
751    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
752        return Err(SQLRiteError::Internal(format!(
753            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
754            row.sql
755        )));
756    };
757
758    if row.rootpage == 0 {
759        // Compatibility path — no persisted postings; replay rows.
760        execute_create_index(&stmt, db)?;
761        return Ok(());
762    }
763
764    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
765    let index = PostingList::from_persisted_postings(doc_lengths, postings);
766    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
767    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
768        SQLRiteError::Internal(format!(
769            "FTS index '{}' references unknown table '{tbl_name}'",
770            row.name
771        ))
772    })?;
773    table_mut.fts_indexes.push(FtsIndexEntry {
774        name: row.name.clone(),
775        column_name: col_name,
776        index,
777        needs_rebuild: false,
778    });
779    Ok(())
780}
781
782/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
783/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
784fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
785    use sqlparser::ast::{CreateIndex, Expr, Statement};
786
787    let dialect = SQLiteDialect {};
788    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
789    let Some(Statement::CreateIndex(CreateIndex {
790        table_name,
791        columns,
792        ..
793    })) = ast.pop()
794    else {
795        return Err(SQLRiteError::Internal(format!(
796            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
797        )));
798    };
799    if columns.len() != 1 {
800        return Err(SQLRiteError::NotImplemented(
801            "multi-column FTS indexes aren't supported yet".to_string(),
802        ));
803    }
804    let col = match &columns[0].column.expr {
805        Expr::Identifier(ident) => ident.value.clone(),
806        Expr::CompoundIdentifier(parts) => {
807            parts.last().map(|p| p.value.clone()).unwrap_or_default()
808        }
809        other => {
810            return Err(SQLRiteError::Internal(format!(
811                "FTS CREATE INDEX has unexpected column expr: {other:?}"
812            )));
813        }
814    };
815    Ok((table_name.to_string(), col))
816}
817
818/// Loads (or rebuilds) an HNSW index on database open. Two paths:
819///
820///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
821///     as cell-encoded pages. Read every node directly via
822///     `load_hnsw_nodes` and reconstruct the index — fast, zero
823///     algorithm runs, exact bit-for-bit reproduction of what was saved.
824///
825///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
826///     files saved by Phase 7d.2 before persistence landed. Replay the
827///     CREATE INDEX SQL through `execute_create_index`, which walks the
828///     table's current rows and populates a fresh graph. Slower but
829///     correctness-equivalent on the first save with the new code.
830fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
831    use crate::sql::db::table::HnswIndexEntry;
832    use crate::sql::executor::execute_create_index;
833    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
834    use sqlparser::ast::Statement;
835
836    let dialect = SQLiteDialect {};
837    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
838    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
839        return Err(SQLRiteError::Internal(format!(
840            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
841            row.sql
842        )));
843    };
844
845    if row.rootpage == 0 {
846        // Compatibility path — no persisted graph; walk current rows.
847        execute_create_index(&stmt, db)?;
848        return Ok(());
849    }
850
851    // Persistence path — read the cell tree, deserialize.
852    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
853    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
854
855    // Parse the CREATE INDEX to know which table + column to attach to
856    // — same shape as the row-walk path; we just don't execute it.
857    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
858    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
859        SQLRiteError::Internal(format!(
860            "HNSW index '{}' references unknown table '{tbl_name}'",
861            row.name
862        ))
863    })?;
864    table_mut.hnsw_indexes.push(HnswIndexEntry {
865        name: row.name.clone(),
866        column_name: col_name,
867        index,
868        needs_rebuild: false,
869    });
870    Ok(())
871}
872
873/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
874/// page tree at `root_page` and decode each cell as a node. Returns
875/// the (node_id, layers) tuples in slot-order (already ascending by
876/// node_id since they were staged that way). The caller hands them to
877/// `HnswIndex::from_persisted_nodes`.
878fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
879    use crate::sql::pager::hnsw_cell::HnswNodeCell;
880
881    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
882    let first_leaf = find_leftmost_leaf(pager, root_page)?;
883    let mut current = first_leaf;
884    while current != 0 {
885        let page_buf = pager
886            .read_page(current)
887            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
888        if page_buf[0] != PageType::TableLeaf as u8 {
889            return Err(SQLRiteError::Internal(format!(
890                "page {current} tagged {} but expected TableLeaf (HNSW)",
891                page_buf[0]
892            )));
893        }
894        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
895        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
896            .try_into()
897            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
898        let leaf = TablePage::from_bytes(payload);
899        for slot in 0..leaf.slot_count() {
900            let offset = leaf.slot_offset_raw(slot)?;
901            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
902            nodes.push((cell.node_id, cell.layers));
903        }
904        current = next_leaf;
905    }
906    Ok(nodes)
907}
908
909/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
910/// SQL string. Used by the persistence path on open to know where to
911/// attach the loaded graph. Same shape as `parse_create_index_sql` for
912/// regular indexes — only the assertion differs (we don't care about
913/// UNIQUE for HNSW).
914fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
915    use sqlparser::ast::{CreateIndex, Expr, Statement};
916
917    let dialect = SQLiteDialect {};
918    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
919    let Some(Statement::CreateIndex(CreateIndex {
920        table_name,
921        columns,
922        ..
923    })) = ast.pop()
924    else {
925        return Err(SQLRiteError::Internal(format!(
926            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
927        )));
928    };
929    if columns.len() != 1 {
930        return Err(SQLRiteError::NotImplemented(
931            "multi-column HNSW indexes aren't supported yet".to_string(),
932        ));
933    }
934    let col = match &columns[0].column.expr {
935        Expr::Identifier(ident) => ident.value.clone(),
936        Expr::CompoundIdentifier(parts) => {
937            parts.last().map(|p| p.value.clone()).unwrap_or_default()
938        }
939        other => {
940            return Err(SQLRiteError::Internal(format!(
941                "unsupported HNSW indexed column expression: {other:?}"
942            )));
943        }
944    };
945    Ok((table_name.to_string(), col))
946}
947
948/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
949/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
950/// Walks the table's current Vec<f32> column storage and runs the
951/// HNSW algorithm fresh. Called at the top of `save_database` before
952/// any immutable borrows of `db` start.
953///
954/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
955/// small tables, expensive for ≥100k-row tables — matches the
956/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
957/// MVP, more sophisticated incremental delete strategies (soft-delete
958/// + tombstones, neighbor reconnection) are future polish.
959fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
960    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
961
962    for table in db.tables.values_mut() {
963        // Snapshot which (index_name, column) pairs need rebuilding,
964        // before we go grabbing column data — keeps the borrow
965        // structure simple.
966        let dirty: Vec<(String, String)> = table
967            .hnsw_indexes
968            .iter()
969            .filter(|e| e.needs_rebuild)
970            .map(|e| (e.name.clone(), e.column_name.clone()))
971            .collect();
972        if dirty.is_empty() {
973            continue;
974        }
975
976        for (idx_name, col_name) in dirty {
977            // Snapshot every (rowid, vec) for this column.
978            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
979            {
980                let row_data = table.rows.lock().expect("rows mutex poisoned");
981                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
982                    for (id, v) in map.iter() {
983                        vectors.push((*id, v.clone()));
984                    }
985                }
986            }
987            // Pre-build a HashMap for the get_vec closure so we don't
988            // pay O(N) lookup per insert call.
989            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
990                vectors.iter().cloned().collect();
991
992            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
993            // Sort by id so the rebuild is deterministic across runs.
994            vectors.sort_by_key(|(id, _)| *id);
995            for (id, v) in &vectors {
996                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
997            }
998
999            // Replace the entry's index + clear the dirty flag.
1000            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
1001                entry.index = new_idx;
1002                entry.needs_rebuild = false;
1003            }
1004        }
1005    }
1006}
1007
1008/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1009/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1010/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1011/// is empty so the per-table loop short-circuits).
1012fn rebuild_dirty_fts_indexes(db: &mut Database) {
1013    use crate::sql::fts::PostingList;
1014
1015    for table in db.tables.values_mut() {
1016        let dirty: Vec<(String, String)> = table
1017            .fts_indexes
1018            .iter()
1019            .filter(|e| e.needs_rebuild)
1020            .map(|e| (e.name.clone(), e.column_name.clone()))
1021            .collect();
1022        if dirty.is_empty() {
1023            continue;
1024        }
1025
1026        for (idx_name, col_name) in dirty {
1027            // Snapshot every (rowid, text) pair for this column under
1028            // the row mutex, then drop the lock before re-tokenizing.
1029            let mut docs: Vec<(i64, String)> = Vec::new();
1030            {
1031                let row_data = table.rows.lock().expect("rows mutex poisoned");
1032                if let Some(Row::Text(map)) = row_data.get(&col_name) {
1033                    for (id, v) in map.iter() {
1034                        // "Null" sentinel is the parser's
1035                        // null-marker for TEXT cells; skip those —
1036                        // they'd round-trip as the literal string
1037                        // "Null" otherwise. Aligns with insert_row's
1038                        // typed_value gate.
1039                        if v != "Null" {
1040                            docs.push((*id, v.clone()));
1041                        }
1042                    }
1043                }
1044            }
1045
1046            let mut new_idx = PostingList::new();
1047            // Sort by id so the rebuild is deterministic across runs
1048            // (the BTreeMap inside PostingList is order-stable, but
1049            // doc-length aggregation order doesn't matter — sorting
1050            // here is purely for reproducibility on inspection).
1051            docs.sort_by_key(|(id, _)| *id);
1052            for (id, text) in &docs {
1053                new_idx.insert(*id, text);
1054            }
1055
1056            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
1057                entry.index = new_idx;
1058                entry.needs_rebuild = false;
1059            }
1060        }
1061    }
1062}
1063
1064/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1065fn clone_datatype(dt: &DataType) -> DataType {
1066    match dt {
1067        DataType::Integer => DataType::Integer,
1068        DataType::Text => DataType::Text,
1069        DataType::Real => DataType::Real,
1070        DataType::Bool => DataType::Bool,
1071        DataType::Vector(dim) => DataType::Vector(*dim),
1072        DataType::Json => DataType::Json,
1073        DataType::None => DataType::None,
1074        DataType::Invalid => DataType::Invalid,
1075    }
1076}
1077
1078/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1079/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1080/// `(root_page, next_free_page)`.
1081///
1082/// The tree's shape matches a regular table's — leaves chained via
1083/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1084/// uniformly for index cells (same prefix as local cells), so the
1085/// existing slot directory and binary search carry over.
1086fn stage_index_btree(
1087    pager: &mut Pager,
1088    idx: &SecondaryIndex,
1089    start_page: u32,
1090) -> Result<(u32, u32)> {
1091    // Build the leaves.
1092    let (leaves, mut next_free_page) = stage_index_leaves(pager, idx, start_page)?;
1093    if leaves.len() == 1 {
1094        return Ok((leaves[0].0, next_free_page));
1095    }
1096    let mut level: Vec<(u32, i64)> = leaves;
1097    while level.len() > 1 {
1098        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1099        next_free_page = new_next_free;
1100        level = next_level;
1101    }
1102    Ok((level[0].0, next_free_page))
1103}
1104
1105/// Packs the index's (value, rowid) entries into a sibling-chained run
1106/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1107/// (ascending value; rowids in insertion order within a value), which is
1108/// also ascending by the "cell rowid" carried in each IndexCell (the
1109/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1110/// rowid ordering stays consistent.
1111fn stage_index_leaves(
1112    pager: &mut Pager,
1113    idx: &SecondaryIndex,
1114    start_page: u32,
1115) -> Result<(Vec<(u32, i64)>, u32)> {
1116    let mut leaves: Vec<(u32, i64)> = Vec::new();
1117    let mut current_leaf = TablePage::empty();
1118    let mut current_leaf_page = start_page;
1119    let mut current_max_rowid: Option<i64> = None;
1120    let mut next_free_page = start_page + 1;
1121
1122    // Sort the entries by original rowid so the in-page slot directory,
1123    // which binary-searches by rowid, stays valid. (iter_entries orders by
1124    // value; we reorder here for B-Tree correctness.)
1125    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
1126    entries.sort_by_key(|(_, r)| *r);
1127
1128    for (value, rowid) in entries {
1129        let cell = IndexCell::new(rowid, value);
1130        let entry_bytes = cell.encode()?;
1131
1132        if !current_leaf.would_fit(entry_bytes.len()) {
1133            let next_leaf_page_num = next_free_page;
1134            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1135            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1136            current_leaf = TablePage::empty();
1137            current_leaf_page = next_leaf_page_num;
1138            next_free_page += 1;
1139
1140            if !current_leaf.would_fit(entry_bytes.len()) {
1141                return Err(SQLRiteError::Internal(format!(
1142                    "index entry of {} bytes exceeds empty-page capacity {}",
1143                    entry_bytes.len(),
1144                    current_leaf.free_space()
1145                )));
1146            }
1147        }
1148        current_leaf.insert_entry(rowid, &entry_bytes)?;
1149        current_max_rowid = Some(rowid);
1150    }
1151
1152    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1153    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1154    Ok((leaves, next_free_page))
1155}
1156
1157/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1158/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1159/// (node_id, layers). Returns `(root_page, next_free_page)`.
1160///
1161/// Tree shape is identical to `stage_index_btree` — chained leaves +
1162/// optional interior layers. The slot directory binary-searches by
1163/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1164/// so reads can locate any node in O(log N) once 7d.4-or-later
1165/// optimizes the load path to lazy-fetch instead of read-all.
1166/// Today, `load_hnsw_nodes` reads the entire tree on open.
1167fn stage_hnsw_btree(
1168    pager: &mut Pager,
1169    idx: &crate::sql::hnsw::HnswIndex,
1170    start_page: u32,
1171) -> Result<(u32, u32)> {
1172    let (leaves, mut next_free_page) = stage_hnsw_leaves(pager, idx, start_page)?;
1173    if leaves.len() == 1 {
1174        return Ok((leaves[0].0, next_free_page));
1175    }
1176    let mut level: Vec<(u32, i64)> = leaves;
1177    while level.len() > 1 {
1178        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1179        next_free_page = new_next_free;
1180        level = next_level;
1181    }
1182    Ok((level[0].0, next_free_page))
1183}
1184
1185/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1186/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1187/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1188/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1189/// doc-lengths map, then one cell per term in lexicographic order.
1190fn stage_fts_btree(
1191    pager: &mut Pager,
1192    idx: &crate::sql::fts::PostingList,
1193    start_page: u32,
1194) -> Result<(u32, u32)> {
1195    let (leaves, mut next_free_page) = stage_fts_leaves(pager, idx, start_page)?;
1196    if leaves.len() == 1 {
1197        return Ok((leaves[0].0, next_free_page));
1198    }
1199    let mut level: Vec<(u32, i64)> = leaves;
1200    while level.len() > 1 {
1201        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1202        next_free_page = new_next_free;
1203        level = next_level;
1204    }
1205    Ok((level[0].0, next_free_page))
1206}
1207
1208/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1209/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1210/// followed by one cell per term in lexicographic order with
1211/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1212/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1213/// returns).
1214fn stage_fts_leaves(
1215    pager: &mut Pager,
1216    idx: &crate::sql::fts::PostingList,
1217    start_page: u32,
1218) -> Result<(Vec<(u32, i64)>, u32)> {
1219    use crate::sql::pager::fts_cell::FtsPostingCell;
1220
1221    let mut leaves: Vec<(u32, i64)> = Vec::new();
1222    let mut current_leaf = TablePage::empty();
1223    let mut current_leaf_page = start_page;
1224    let mut current_max_rowid: Option<i64> = None;
1225    let mut next_free_page = start_page + 1;
1226
1227    // Build the cell sequence: sidecar first, then per-term cells. The
1228    // sidecar always exists (even on an empty index) so reload sees a
1229    // canonical "this index was persisted" marker in slot 0.
1230    let mut cell_id: i64 = 1;
1231    let mut cells: Vec<FtsPostingCell> = Vec::new();
1232    cells.push(FtsPostingCell::doc_lengths(
1233        cell_id,
1234        idx.serialize_doc_lengths(),
1235    ));
1236    for (term, entries) in idx.serialize_postings() {
1237        cell_id += 1;
1238        cells.push(FtsPostingCell::posting(cell_id, term, entries));
1239    }
1240
1241    for cell in cells {
1242        let entry_bytes = cell.encode()?;
1243
1244        if !current_leaf.would_fit(entry_bytes.len()) {
1245            let next_leaf_page_num = next_free_page;
1246            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1247            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1248            current_leaf = TablePage::empty();
1249            current_leaf_page = next_leaf_page_num;
1250            next_free_page += 1;
1251
1252            if !current_leaf.would_fit(entry_bytes.len()) {
1253                // A single posting cell exceeds page capacity. Phase
1254                // 8c MVP doesn't chain via overflow cells (the plan
1255                // notes this as a stretch goal); surface a clear
1256                // error so users know which term tripped it.
1257                return Err(SQLRiteError::Internal(format!(
1258                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1259                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1260                    cell.cell_id,
1261                    entry_bytes.len(),
1262                    current_leaf.free_space()
1263                )));
1264            }
1265        }
1266        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
1267        current_max_rowid = Some(cell.cell_id);
1268    }
1269
1270    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1271    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1272    Ok((leaves, next_free_page))
1273}
1274
1275/// (rowid, value) pairs as decoded from a single FTS cell — value is
1276/// either term frequency (posting cell) or doc length (sidecar cell).
1277type FtsEntries = Vec<(i64, u32)>;
1278/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1279type FtsPostings = Vec<(String, FtsEntries)>;
1280
1281/// Phase 8c — read every cell of an FTS index from `root_page` back
1282/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1283/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1284/// sibling chain, decode each slot.
1285fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1286    use crate::sql::pager::fts_cell::FtsPostingCell;
1287
1288    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1289    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1290    let mut saw_sidecar = false;
1291
1292    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1293    let mut current = first_leaf;
1294    while current != 0 {
1295        let page_buf = pager
1296            .read_page(current)
1297            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1298        if page_buf[0] != PageType::TableLeaf as u8 {
1299            return Err(SQLRiteError::Internal(format!(
1300                "page {current} tagged {} but expected TableLeaf (FTS)",
1301                page_buf[0]
1302            )));
1303        }
1304        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1305        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1306            .try_into()
1307            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1308        let leaf = TablePage::from_bytes(payload);
1309        for slot in 0..leaf.slot_count() {
1310            let offset = leaf.slot_offset_raw(slot)?;
1311            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1312            if cell.is_doc_lengths() {
1313                if saw_sidecar {
1314                    return Err(SQLRiteError::Internal(
1315                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
1316                    ));
1317                }
1318                saw_sidecar = true;
1319                doc_lengths = cell.entries;
1320            } else {
1321                postings.push((cell.term, cell.entries));
1322            }
1323        }
1324        current = next_leaf;
1325    }
1326
1327    if !saw_sidecar {
1328        return Err(SQLRiteError::Internal(
1329            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
1330        ));
1331    }
1332    Ok((doc_lengths, postings))
1333}
1334
1335/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1336/// `serialize_nodes` already returns nodes in ascending node_id order,
1337/// so the slot directory's rowid ordering stays valid.
1338fn stage_hnsw_leaves(
1339    pager: &mut Pager,
1340    idx: &crate::sql::hnsw::HnswIndex,
1341    start_page: u32,
1342) -> Result<(Vec<(u32, i64)>, u32)> {
1343    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1344
1345    let mut leaves: Vec<(u32, i64)> = Vec::new();
1346    let mut current_leaf = TablePage::empty();
1347    let mut current_leaf_page = start_page;
1348    let mut current_max_rowid: Option<i64> = None;
1349    let mut next_free_page = start_page + 1;
1350
1351    let serialized = idx.serialize_nodes();
1352
1353    // Empty index → emit a single empty leaf page so the rootpage
1354    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1355    // it just happens to be empty"). load_hnsw_nodes is fine with an
1356    // empty leaf — slot_count() returns 0.
1357    for (node_id, layers) in serialized {
1358        let cell = HnswNodeCell::new(node_id, layers);
1359        let entry_bytes = cell.encode()?;
1360
1361        if !current_leaf.would_fit(entry_bytes.len()) {
1362            let next_leaf_page_num = next_free_page;
1363            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1364            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1365            current_leaf = TablePage::empty();
1366            current_leaf_page = next_leaf_page_num;
1367            next_free_page += 1;
1368
1369            if !current_leaf.would_fit(entry_bytes.len()) {
1370                return Err(SQLRiteError::Internal(format!(
1371                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1372                    entry_bytes.len(),
1373                    current_leaf.free_space()
1374                )));
1375            }
1376        }
1377        current_leaf.insert_entry(node_id, &entry_bytes)?;
1378        current_max_rowid = Some(node_id);
1379    }
1380
1381    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1382    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1383    Ok((leaves, next_free_page))
1384}
1385
1386fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1387    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1388    let mut current = first_leaf;
1389    while current != 0 {
1390        let page_buf = pager
1391            .read_page(current)
1392            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1393        if page_buf[0] != PageType::TableLeaf as u8 {
1394            return Err(SQLRiteError::Internal(format!(
1395                "page {current} tagged {} but expected TableLeaf",
1396                page_buf[0]
1397            )));
1398        }
1399        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1400        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1401            .try_into()
1402            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1403        let leaf = TablePage::from_bytes(payload);
1404
1405        for slot in 0..leaf.slot_count() {
1406            let entry = leaf.entry_at(slot)?;
1407            let cell = match entry {
1408                PagedEntry::Local(c) => c,
1409                PagedEntry::Overflow(r) => {
1410                    let body_bytes =
1411                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1412                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1413                    c
1414                }
1415            };
1416            table.restore_row(cell.rowid, cell.values)?;
1417        }
1418        current = next_leaf;
1419    }
1420    Ok(())
1421}
1422
1423/// Descends from `root_page` through `InteriorNode` pages, always taking
1424/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1425/// page number. A root that's already a leaf is returned as-is.
1426fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1427    let mut current = root_page;
1428    loop {
1429        let page_buf = pager.read_page(current).ok_or_else(|| {
1430            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1431        })?;
1432        match page_buf[0] {
1433            t if t == PageType::TableLeaf as u8 => return Ok(current),
1434            t if t == PageType::InteriorNode as u8 => {
1435                let payload: &[u8; PAYLOAD_PER_PAGE] =
1436                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1437                        SQLRiteError::Internal("interior payload slice size".to_string())
1438                    })?;
1439                let interior = InteriorPage::from_bytes(payload);
1440                current = interior.leftmost_child()?;
1441            }
1442            other => {
1443                return Err(SQLRiteError::Internal(format!(
1444                    "unexpected page type {other} during tree descent at page {current}"
1445                )));
1446            }
1447        }
1448    }
1449}
1450
1451/// Stages a table's B-Tree starting at `start_page`. Returns
1452/// `(root_page, next_free_page)`. Builds bottom-up:
1453///
1454/// 1. Pack all row cells into `TableLeaf` pages, chaining them via each
1455///    leaf's `next_page` sibling pointer (for fast sequential scans).
1456/// 2. If the table fits in a single leaf, that leaf is the root.
1457/// 3. Otherwise, group leaves into `InteriorNode` pages; recurse up the
1458///    tree until one root remains.
1459///
1460/// Deterministic: same in-memory rows → same pages at same offsets, so
1461/// the Pager's diff commit still skips unchanged tables.
1462fn stage_table_btree(pager: &mut Pager, table: &Table, start_page: u32) -> Result<(u32, u32)> {
1463    let (leaves, mut next_free_page) = stage_leaves(pager, table, start_page)?;
1464    if leaves.len() == 1 {
1465        return Ok((leaves[0].0, next_free_page));
1466    }
1467    let mut level: Vec<(u32, i64)> = leaves;
1468    while level.len() > 1 {
1469        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1470        next_free_page = new_next_free;
1471        level = next_level;
1472    }
1473    Ok((level[0].0, next_free_page))
1474}
1475
1476/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1477/// Returns each leaf's `(page_number, max_rowid)` (used by the next level
1478/// up to build divider cells) and the first free page after the chain
1479/// including any overflow pages allocated for oversized cells.
1480fn stage_leaves(
1481    pager: &mut Pager,
1482    table: &Table,
1483    start_page: u32,
1484) -> Result<(Vec<(u32, i64)>, u32)> {
1485    let mut leaves: Vec<(u32, i64)> = Vec::new();
1486    let mut current_leaf = TablePage::empty();
1487    let mut current_leaf_page = start_page;
1488    let mut current_max_rowid: Option<i64> = None;
1489    let mut next_free_page = start_page + 1;
1490
1491    for rowid in table.rowids() {
1492        let entry_bytes = build_row_entry(pager, table, rowid, &mut next_free_page)?;
1493
1494        if !current_leaf.would_fit(entry_bytes.len()) {
1495            // Commit the current leaf. Its sibling next_page is the page
1496            // number where the new leaf will go — which is next_free_page
1497            // right now (no overflow pages have been allocated between
1498            // this decision and the new leaf's allocation below).
1499            let next_leaf_page_num = next_free_page;
1500            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1501            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1502            current_leaf = TablePage::empty();
1503            current_leaf_page = next_leaf_page_num;
1504            next_free_page += 1;
1505            // current_max_rowid is reassigned by the insert below; no need
1506            // to zero it out here.
1507
1508            if !current_leaf.would_fit(entry_bytes.len()) {
1509                return Err(SQLRiteError::Internal(format!(
1510                    "entry of {} bytes exceeds empty-page capacity {}",
1511                    entry_bytes.len(),
1512                    current_leaf.free_space()
1513                )));
1514            }
1515        }
1516        current_leaf.insert_entry(rowid, &entry_bytes)?;
1517        current_max_rowid = Some(rowid);
1518    }
1519
1520    // Final leaf: sibling next_page = 0 (end of chain).
1521    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1522    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1523    Ok((leaves, next_free_page))
1524}
1525
1526/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1527/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1528/// encoded cell exceeded the inline threshold. Advances `next_free_page`
1529/// past any overflow pages used.
1530fn build_row_entry(
1531    pager: &mut Pager,
1532    table: &Table,
1533    rowid: i64,
1534    next_free_page: &mut u32,
1535) -> Result<Vec<u8>> {
1536    let values = table.extract_row(rowid);
1537    let local_cell = Cell::new(rowid, values);
1538    let local_bytes = local_cell.encode()?;
1539    if local_bytes.len() > OVERFLOW_THRESHOLD {
1540        let overflow_start = *next_free_page;
1541        *next_free_page = write_overflow_chain(pager, &local_bytes, overflow_start)?;
1542        Ok(OverflowRef {
1543            rowid,
1544            total_body_len: local_bytes.len() as u64,
1545            first_overflow_page: overflow_start,
1546        }
1547        .encode())
1548    } else {
1549        Ok(local_bytes)
1550    }
1551}
1552
1553/// Builds one level of `InteriorNode` pages above the given children.
1554/// Each interior packs as many dividers as will fit; the last child
1555/// assigned to an interior becomes its `rightmost_child`. Returns the
1556/// emitted interior pages as `(page_number, max_rowid_in_subtree)` so the
1557/// next level can build on top of them.
1558fn stage_interior_level(
1559    pager: &mut Pager,
1560    children: &[(u32, i64)],
1561    start_page: u32,
1562) -> Result<(Vec<(u32, i64)>, u32)> {
1563    let mut next_level: Vec<(u32, i64)> = Vec::new();
1564    let mut next_free_page = start_page;
1565    let mut idx = 0usize;
1566
1567    while idx < children.len() {
1568        let interior_page_num = next_free_page;
1569        next_free_page += 1;
1570
1571        // Seed the interior with the first unassigned child as its
1572        // rightmost. As we add more children, the previous rightmost
1573        // graduates to being a divider and the new arrival takes over
1574        // as rightmost.
1575        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1576        idx += 1;
1577        let mut interior = InteriorPage::empty(rightmost_child_page);
1578
1579        while idx < children.len() {
1580            let new_divider_cell = InteriorCell {
1581                divider_rowid: rightmost_child_max,
1582                child_page: rightmost_child_page,
1583            };
1584            let new_divider_bytes = new_divider_cell.encode();
1585            if !interior.would_fit(new_divider_bytes.len()) {
1586                break;
1587            }
1588            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1589            let (next_child_page, next_child_max) = children[idx];
1590            interior.set_rightmost_child(next_child_page);
1591            rightmost_child_page = next_child_page;
1592            rightmost_child_max = next_child_max;
1593            idx += 1;
1594        }
1595
1596        emit_interior(pager, interior_page_num, &interior);
1597        next_level.push((interior_page_num, rightmost_child_max));
1598    }
1599
1600    Ok((next_level, next_free_page))
1601}
1602
1603/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1604fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1605    let mut buf = [0u8; PAGE_SIZE];
1606    buf[0] = PageType::TableLeaf as u8;
1607    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1608    // For leaf pages the legacy `payload_len` field isn't used — the slot
1609    // directory self-describes. Zero it by convention.
1610    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1611    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1612    pager.stage_page(page_num, buf);
1613}
1614
1615/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1616/// don't use `next_page` (there's no sibling chain between interiors);
1617/// `payload_len` is also unused (the slot directory self-describes).
1618fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1619    let mut buf = [0u8; PAGE_SIZE];
1620    buf[0] = PageType::InteriorNode as u8;
1621    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1622    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1623    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1624    pager.stage_page(page_num, buf);
1625}
1626
1627#[cfg(test)]
1628mod tests {
1629    use super::*;
1630    use crate::sql::process_command;
1631
1632    fn seed_db() -> Database {
1633        let mut db = Database::new("test".to_string());
1634        process_command(
1635            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1636            &mut db,
1637        )
1638        .unwrap();
1639        process_command(
1640            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1641            &mut db,
1642        )
1643        .unwrap();
1644        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1645        process_command(
1646            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1647            &mut db,
1648        )
1649        .unwrap();
1650        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1651        db
1652    }
1653
1654    fn tmp_path(name: &str) -> std::path::PathBuf {
1655        let mut p = std::env::temp_dir();
1656        let pid = std::process::id();
1657        let nanos = std::time::SystemTime::now()
1658            .duration_since(std::time::UNIX_EPOCH)
1659            .map(|d| d.as_nanos())
1660            .unwrap_or(0);
1661        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1662        p
1663    }
1664
1665    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1666    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1667    fn cleanup(path: &std::path::Path) {
1668        let _ = std::fs::remove_file(path);
1669        let mut wal = path.as_os_str().to_owned();
1670        wal.push("-wal");
1671        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1672    }
1673
1674    #[test]
1675    fn round_trip_preserves_schema_and_data() {
1676        let path = tmp_path("roundtrip");
1677        let mut db = seed_db();
1678        save_database(&mut db, &path).expect("save");
1679
1680        let loaded = open_database(&path, "test".to_string()).expect("open");
1681        assert_eq!(loaded.tables.len(), 2);
1682
1683        let users = loaded.get_table("users".to_string()).expect("users table");
1684        assert_eq!(users.columns.len(), 3);
1685        let rowids = users.rowids();
1686        assert_eq!(rowids.len(), 2);
1687        let names: Vec<String> = rowids
1688            .iter()
1689            .filter_map(|r| match users.get_value("name", *r) {
1690                Some(Value::Text(s)) => Some(s),
1691                _ => None,
1692            })
1693            .collect();
1694        assert!(names.contains(&"alice".to_string()));
1695        assert!(names.contains(&"bob".to_string()));
1696
1697        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1698        assert_eq!(notes.rowids().len(), 1);
1699
1700        cleanup(&path);
1701    }
1702
1703    // -----------------------------------------------------------------
1704    // Phase 7a — VECTOR(N) save / reopen round-trip
1705    // -----------------------------------------------------------------
1706
1707    #[test]
1708    fn round_trip_preserves_vector_column() {
1709        let path = tmp_path("vec_roundtrip");
1710
1711        // Build, populate, save.
1712        {
1713            let mut db = Database::new("test".to_string());
1714            process_command(
1715                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1716                &mut db,
1717            )
1718            .unwrap();
1719            process_command(
1720                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1721                &mut db,
1722            )
1723            .unwrap();
1724            process_command(
1725                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1726                &mut db,
1727            )
1728            .unwrap();
1729            save_database(&mut db, &path).expect("save");
1730        } // db drops → its exclusive lock releases before reopen.
1731
1732        // Reopen and verify schema + data both round-tripped.
1733        let loaded = open_database(&path, "test".to_string()).expect("open");
1734        let docs = loaded.get_table("docs".to_string()).expect("docs table");
1735
1736        // Schema preserved: column is still VECTOR(3).
1737        let embedding_col = docs
1738            .columns
1739            .iter()
1740            .find(|c| c.column_name == "embedding")
1741            .expect("embedding column");
1742        assert!(
1743            matches!(embedding_col.datatype, DataType::Vector(3)),
1744            "expected DataType::Vector(3) after round-trip, got {:?}",
1745            embedding_col.datatype
1746        );
1747
1748        // Data preserved: both vectors still readable bit-for-bit.
1749        let mut rows: Vec<Vec<f32>> = docs
1750            .rowids()
1751            .iter()
1752            .filter_map(|r| match docs.get_value("embedding", *r) {
1753                Some(Value::Vector(v)) => Some(v),
1754                _ => None,
1755            })
1756            .collect();
1757        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
1758        assert_eq!(rows.len(), 2);
1759        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1760        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1761
1762        cleanup(&path);
1763    }
1764
1765    #[test]
1766    fn round_trip_preserves_json_column() {
1767        // Phase 7e — JSON columns are stored as Text under the hood with
1768        // INSERT-time validation. Save + reopen should preserve the
1769        // schema (DataType::Json) and the underlying text bytes; a
1770        // post-reopen json_extract should still resolve paths correctly.
1771        let path = tmp_path("json_roundtrip");
1772
1773        {
1774            let mut db = Database::new("test".to_string());
1775            process_command(
1776                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
1777                &mut db,
1778            )
1779            .unwrap();
1780            process_command(
1781                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
1782                &mut db,
1783            )
1784            .unwrap();
1785            save_database(&mut db, &path).expect("save");
1786        }
1787
1788        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1789        let docs = loaded.get_table("docs".to_string()).expect("docs");
1790
1791        // Schema: column declared as JSON, restored with the same type.
1792        let payload_col = docs
1793            .columns
1794            .iter()
1795            .find(|c| c.column_name == "payload")
1796            .unwrap();
1797        assert!(
1798            matches!(payload_col.datatype, DataType::Json),
1799            "expected DataType::Json, got {:?}",
1800            payload_col.datatype
1801        );
1802
1803        // json_extract works against the reopened data — exercises the
1804        // full Text-storage + serde_json::from_str path post-reopen.
1805        let resp = process_command(
1806            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
1807            &mut loaded,
1808        )
1809        .expect("select via json_extract after reopen");
1810        assert!(resp.contains("1 row returned"), "got: {resp}");
1811
1812        cleanup(&path);
1813    }
1814
1815    #[test]
1816    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
1817        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
1818        // pages. After save+reopen the index entry reattaches with the
1819        // same column + same node count, loaded directly from disk
1820        // instead of re-walking rows.
1821        let path = tmp_path("hnsw_roundtrip");
1822
1823        // Build, populate, index, save.
1824        {
1825            let mut db = Database::new("test".to_string());
1826            process_command(
1827                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
1828                &mut db,
1829            )
1830            .unwrap();
1831            for v in &[
1832                "[1.0, 0.0]",
1833                "[2.0, 0.0]",
1834                "[0.0, 3.0]",
1835                "[1.0, 4.0]",
1836                "[10.0, 10.0]",
1837            ] {
1838                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
1839            }
1840            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1841            save_database(&mut db, &path).expect("save");
1842        } // db drops → exclusive lock releases.
1843
1844        // Reopen and verify the index reattached, with the same name +
1845        // column + populated graph.
1846        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1847        {
1848            let table = loaded.get_table("docs".to_string()).expect("docs");
1849            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1850            let entry = &table.hnsw_indexes[0];
1851            assert_eq!(entry.name, "ix_e");
1852            assert_eq!(entry.column_name, "e");
1853            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
1854            assert!(
1855                !entry.needs_rebuild,
1856                "fresh load should not be marked dirty"
1857            );
1858        }
1859
1860        // Quick functional check: KNN query through the loaded index
1861        // returns results.
1862        let resp = process_command(
1863            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
1864            &mut loaded,
1865        )
1866        .unwrap();
1867        assert!(resp.contains("3 rows returned"), "got: {resp}");
1868
1869        cleanup(&path);
1870    }
1871
1872    #[test]
1873    fn round_trip_rebuilds_fts_index_from_create_sql() {
1874        // Phase 8c: FTS indexes now persist their posting lists as
1875        // cell-encoded pages. After save+reopen the index entry
1876        // reattaches with the same column + same posting count, loaded
1877        // directly from disk (no re-tokenization).
1878        let path = tmp_path("fts_roundtrip");
1879
1880        {
1881            let mut db = Database::new("test".to_string());
1882            process_command(
1883                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1884                &mut db,
1885            )
1886            .unwrap();
1887            for body in &[
1888                "rust embedded database",
1889                "rust web framework",
1890                "go embedded systems",
1891                "python web framework",
1892                "rust rust embedded power",
1893            ] {
1894                process_command(
1895                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1896                    &mut db,
1897                )
1898                .unwrap();
1899            }
1900            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1901            save_database(&mut db, &path).expect("save");
1902        } // db drops → exclusive lock releases.
1903
1904        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1905        {
1906            let table = loaded.get_table("docs".to_string()).expect("docs");
1907            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
1908            let entry = &table.fts_indexes[0];
1909            assert_eq!(entry.name, "ix_body");
1910            assert_eq!(entry.column_name, "body");
1911            assert_eq!(
1912                entry.index.len(),
1913                5,
1914                "rebuilt posting list should hold all 5 rows"
1915            );
1916            assert!(!entry.needs_rebuild);
1917        }
1918
1919        // Functional smoke: an FTS query through the reloaded index
1920        // returns the expected hit count.
1921        let resp = process_command(
1922            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
1923            &mut loaded,
1924        )
1925        .unwrap();
1926        assert!(resp.contains("3 rows returned"), "got: {resp}");
1927
1928        cleanup(&path);
1929    }
1930
1931    #[test]
1932    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
1933        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
1934        // from current rows; reopen replays the CREATE INDEX SQL against
1935        // the post-delete row set. The deleted rowid must not surface
1936        // in `fts_match` results post-reopen.
1937        let path = tmp_path("fts_delete_rebuild");
1938        let mut db = Database::new("test".to_string());
1939        process_command(
1940            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1941            &mut db,
1942        )
1943        .unwrap();
1944        for body in &[
1945            "rust embedded",
1946            "rust framework",
1947            "go embedded",
1948            "python web",
1949        ] {
1950            process_command(
1951                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1952                &mut db,
1953            )
1954            .unwrap();
1955        }
1956        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1957
1958        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
1959        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1960        save_database(&mut db, &path).expect("save");
1961        drop(db);
1962
1963        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1964        let resp = process_command(
1965            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
1966            &mut loaded,
1967        )
1968        .unwrap();
1969        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
1970        // 'rust'. Post-delete: only id=2 remains.
1971        assert!(resp.contains("1 row returned"), "got: {resp}");
1972
1973        cleanup(&path);
1974    }
1975
1976    #[test]
1977    fn fts_roundtrip_uses_persistence_path_not_replay() {
1978        // Phase 8c — assert the reload didn't go through the
1979        // rootpage=0 replay shortcut. We do this by reading the
1980        // sqlrite_master row for the FTS index and confirming its
1981        // rootpage field is non-zero.
1982        let path = tmp_path("fts_persistence_path");
1983
1984        {
1985            let mut db = Database::new("test".to_string());
1986            process_command(
1987                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1988                &mut db,
1989            )
1990            .unwrap();
1991            process_command(
1992                "INSERT INTO docs (body) VALUES ('rust embedded database');",
1993                &mut db,
1994            )
1995            .unwrap();
1996            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1997            save_database(&mut db, &path).expect("save");
1998        }
1999
2000        // Read raw sqlrite_master to find the FTS index row.
2001        let pager = Pager::open(&path).expect("open pager");
2002        let mut master = build_empty_master_table();
2003        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2004        let mut found_rootpage: Option<u32> = None;
2005        for rowid in master.rowids() {
2006            let name = take_text(&master, "name", rowid).unwrap();
2007            if name == "ix_body" {
2008                let rp = take_integer(&master, "rootpage", rowid).unwrap();
2009                found_rootpage = Some(rp as u32);
2010            }
2011        }
2012        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
2013        assert!(
2014            rootpage != 0,
2015            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2016        );
2017
2018        cleanup(&path);
2019    }
2020
2021    #[test]
2022    fn save_without_fts_keeps_format_v4() {
2023        // Phase 8c on-demand bump — a database with zero FTS indexes
2024        // continues writing the v4 header. Existing v4 users must not
2025        // see their files silently promoted to v5 by an upgrade.
2026        use crate::sql::pager::header::FORMAT_VERSION_V4;
2027
2028        let path = tmp_path("fts_no_bump");
2029        let mut db = Database::new("test".to_string());
2030        process_command(
2031            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2032            &mut db,
2033        )
2034        .unwrap();
2035        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
2036        save_database(&mut db, &path).unwrap();
2037        drop(db);
2038
2039        let pager = Pager::open(&path).expect("open");
2040        assert_eq!(
2041            pager.header().format_version,
2042            FORMAT_VERSION_V4,
2043            "no-FTS save should keep v4"
2044        );
2045        cleanup(&path);
2046    }
2047
2048    #[test]
2049    fn save_with_fts_bumps_to_v5() {
2050        // Phase 8c on-demand bump — first FTS-bearing save promotes
2051        // the file to v5. v5 readers handle both v4 and v5; v4
2052        // readers correctly refuse a v5 file.
2053        use crate::sql::pager::header::FORMAT_VERSION_V5;
2054
2055        let path = tmp_path("fts_bump_v5");
2056        let mut db = Database::new("test".to_string());
2057        process_command(
2058            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2059            &mut db,
2060        )
2061        .unwrap();
2062        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
2063        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2064        save_database(&mut db, &path).unwrap();
2065        drop(db);
2066
2067        let pager = Pager::open(&path).expect("open");
2068        assert_eq!(
2069            pager.header().format_version,
2070            FORMAT_VERSION_V5,
2071            "FTS save should promote to v5"
2072        );
2073        cleanup(&path);
2074    }
2075
2076    #[test]
2077    fn fts_persistence_handles_empty_and_zero_token_docs() {
2078        // Phase 8c — sidecar cell carries doc-lengths for every doc
2079        // including any with zero tokens (so total_docs is honest
2080        // post-reopen). Empty index also round-trips: a CREATE INDEX
2081        // on an empty table emits a single empty leaf with just the
2082        // (empty) sidecar.
2083        let path = tmp_path("fts_edges");
2084
2085        {
2086            let mut db = Database::new("test".to_string());
2087            process_command(
2088                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2089                &mut db,
2090            )
2091            .unwrap();
2092            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2093            // Mix: real text, then a row that tokenizes to zero tokens
2094            // (only punctuation), then real again.
2095            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
2096            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
2097            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
2098            save_database(&mut db, &path).unwrap();
2099        }
2100
2101        let loaded = open_database(&path, "test".to_string()).expect("open");
2102        let table = loaded.get_table("docs".to_string()).unwrap();
2103        let entry = &table.fts_indexes[0];
2104        // All three rows present — including the zero-token row,
2105        // which is critical for total_docs honesty in BM25.
2106        assert_eq!(entry.index.len(), 3);
2107        // 'embedded' appears in 2 rows after reload.
2108        let res = entry
2109            .index
2110            .query("embedded", &crate::sql::fts::Bm25Params::default());
2111        assert_eq!(res.len(), 2);
2112
2113        cleanup(&path);
2114    }
2115
2116    #[test]
2117    fn fts_persistence_round_trips_large_corpus() {
2118        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2119        // single-token bodies generates enough cells to overflow a
2120        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2121        let path = tmp_path("fts_large_corpus");
2122
2123        let mut expected_terms: std::collections::BTreeSet<String> =
2124            std::collections::BTreeSet::new();
2125        {
2126            let mut db = Database::new("test".to_string());
2127            process_command(
2128                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2129                &mut db,
2130            )
2131            .unwrap();
2132            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
2133            // 500 docs, each one a unique term — drives unique-term
2134            // count up so multiple leaves are required.
2135            for i in 0..500 {
2136                let term = format!("term{i:04}");
2137                process_command(
2138                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2139                    &mut db,
2140                )
2141                .unwrap();
2142                expected_terms.insert(term);
2143            }
2144            save_database(&mut db, &path).unwrap();
2145        }
2146
2147        let loaded = open_database(&path, "test".to_string()).expect("open");
2148        let table = loaded.get_table("docs".to_string()).unwrap();
2149        let entry = &table.fts_indexes[0];
2150        assert_eq!(entry.index.len(), 500);
2151
2152        // Spot-check a handful of terms come back with their original
2153        // single-row posting list.
2154        for &i in &[0_i64, 137, 248, 391, 499] {
2155            let term = format!("term{i:04}");
2156            let res = entry
2157                .index
2158                .query(&term, &crate::sql::fts::Bm25Params::default());
2159            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2160            // PrimaryKey rowids start at 1; doc i was inserted at
2161            // rowid i+1.
2162            assert_eq!(res[0].0, i + 1);
2163        }
2164
2165        cleanup(&path);
2166    }
2167
2168    #[test]
2169    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
2170        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2171        // current rows + serializes; reopen loads the post-delete graph.
2172        // After all that, the deleted rowid must NOT come back from a
2173        // KNN query.
2174        let path = tmp_path("hnsw_delete_rebuild");
2175        let mut db = Database::new("test".to_string());
2176        process_command(
2177            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2178            &mut db,
2179        )
2180        .unwrap();
2181        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
2182            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2183        }
2184        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
2185
2186        // Delete row 1 (the closest match to [0.5, 0.0]).
2187        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
2188        // Confirm it marked dirty.
2189        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2190        assert!(dirty_before_save, "DELETE should mark dirty");
2191
2192        save_database(&mut db, &path).expect("save");
2193        // Confirm save cleared the dirty flag.
2194        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
2195        assert!(!dirty_after_save, "save should clear dirty");
2196        drop(db);
2197
2198        // Reopen, query for the closest match. Row 1 is gone; row 2
2199        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2200        let loaded = open_database(&path, "test".to_string()).expect("open");
2201        let docs = loaded.get_table("docs".to_string()).expect("docs");
2202
2203        // Row 1 must not appear in any storage anymore.
2204        assert!(
2205            !docs.rowids().contains(&1),
2206            "deleted row 1 should not be in row storage"
2207        );
2208        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
2209
2210        // The HNSW index must also have shed the deleted node.
2211        assert_eq!(
2212            docs.hnsw_indexes[0].index.len(),
2213            3,
2214            "HNSW graph should have shed the deleted node"
2215        );
2216
2217        cleanup(&path);
2218    }
2219
2220    #[test]
2221    fn round_trip_survives_writes_after_load() {
2222        let path = tmp_path("after_load");
2223        save_database(&mut seed_db(), &path).unwrap();
2224
2225        {
2226            let mut db = open_database(&path, "test".to_string()).unwrap();
2227            process_command(
2228                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2229                &mut db,
2230            )
2231            .unwrap();
2232            save_database(&mut db, &path).unwrap();
2233        } // db drops → its exclusive lock releases before we reopen below.
2234
2235        let db2 = open_database(&path, "test".to_string()).unwrap();
2236        let users = db2.get_table("users".to_string()).unwrap();
2237        assert_eq!(users.rowids().len(), 3);
2238
2239        cleanup(&path);
2240    }
2241
2242    #[test]
2243    fn open_rejects_garbage_file() {
2244        let path = tmp_path("bad");
2245        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2246        let result = open_database(&path, "x".to_string());
2247        assert!(result.is_err());
2248        cleanup(&path);
2249    }
2250
2251    #[test]
2252    fn many_small_rows_spread_across_leaves() {
2253        let path = tmp_path("many_rows");
2254        let mut db = Database::new("big".to_string());
2255        process_command(
2256            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2257            &mut db,
2258        )
2259        .unwrap();
2260        for i in 0..200 {
2261            let body = "x".repeat(200);
2262            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2263            process_command(&q, &mut db).unwrap();
2264        }
2265        save_database(&mut db, &path).unwrap();
2266        let loaded = open_database(&path, "big".to_string()).unwrap();
2267        let things = loaded.get_table("things".to_string()).unwrap();
2268        assert_eq!(things.rowids().len(), 200);
2269        cleanup(&path);
2270    }
2271
2272    #[test]
2273    fn huge_row_goes_through_overflow() {
2274        let path = tmp_path("overflow_row");
2275        let mut db = Database::new("big".to_string());
2276        process_command(
2277            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2278            &mut db,
2279        )
2280        .unwrap();
2281        let body = "A".repeat(10_000);
2282        process_command(
2283            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2284            &mut db,
2285        )
2286        .unwrap();
2287        save_database(&mut db, &path).unwrap();
2288
2289        let loaded = open_database(&path, "big".to_string()).unwrap();
2290        let docs = loaded.get_table("docs".to_string()).unwrap();
2291        let rowids = docs.rowids();
2292        assert_eq!(rowids.len(), 1);
2293        let stored = docs.get_value("body", rowids[0]);
2294        match stored {
2295            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
2296            other => panic!("expected Text, got {other:?}"),
2297        }
2298        cleanup(&path);
2299    }
2300
2301    #[test]
2302    fn create_sql_synthesis_round_trips() {
2303        // Build a table via CREATE, then verify table_to_create_sql +
2304        // parse_create_sql reproduce an equivalent column list.
2305        let mut db = Database::new("x".to_string());
2306        process_command(
2307            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2308            &mut db,
2309        )
2310        .unwrap();
2311        let t = db.get_table("t".to_string()).unwrap();
2312        let sql = table_to_create_sql(t);
2313        let (name, cols) = parse_create_sql(&sql).unwrap();
2314        assert_eq!(name, "t");
2315        assert_eq!(cols.len(), 3);
2316        assert!(cols[0].is_pk);
2317        assert!(cols[1].is_unique);
2318        assert!(cols[2].not_null);
2319    }
2320
2321    #[test]
2322    fn sqlrite_master_is_not_exposed_as_a_user_table() {
2323        // After open, the public db.tables map should not list the master.
2324        let path = tmp_path("no_master");
2325        save_database(&mut seed_db(), &path).unwrap();
2326        let loaded = open_database(&path, "x".to_string()).unwrap();
2327        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2328        cleanup(&path);
2329    }
2330
2331    #[test]
2332    fn multi_leaf_table_produces_an_interior_root() {
2333        // 200 fat rows force the table into multiple leaves, which means
2334        // save_database must build at least one InteriorNode above them.
2335        // The test verifies the round-trip works and confirms the root is
2336        // indeed an interior page (not a leaf) by reading the page type
2337        // directly out of the open pager.
2338        let path = tmp_path("multi_leaf_interior");
2339        let mut db = Database::new("big".to_string());
2340        process_command(
2341            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2342            &mut db,
2343        )
2344        .unwrap();
2345        for i in 0..200 {
2346            let body = "x".repeat(200);
2347            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2348            process_command(&q, &mut db).unwrap();
2349        }
2350        save_database(&mut db, &path).unwrap();
2351
2352        // Confirm the round-trip preserved all 200 rows.
2353        let loaded = open_database(&path, "big".to_string()).unwrap();
2354        let things = loaded.get_table("things".to_string()).unwrap();
2355        assert_eq!(things.rowids().len(), 200);
2356
2357        // Peek at `things`'s root page via the pager attached to the
2358        // loaded DB and check it's an InteriorNode, not a leaf.
2359        let pager = loaded
2360            .pager
2361            .as_ref()
2362            .expect("loaded DB should have a pager");
2363        // sqlrite_master's row for `things` holds its root page. Easiest
2364        // way to find it: walk the leaf chain by using find_leftmost_leaf
2365        // and then hop one level up. Simpler: read the master, scan for
2366        // the "things" row, look up rootpage.
2367        let mut master = build_empty_master_table();
2368        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2369        let things_root = master
2370            .rowids()
2371            .into_iter()
2372            .find_map(|r| match master.get_value("name", r) {
2373                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
2374                    Some(Value::Integer(p)) => Some(p as u32),
2375                    _ => None,
2376                },
2377                _ => None,
2378            })
2379            .expect("things should appear in sqlrite_master");
2380        let root_buf = pager.read_page(things_root).unwrap();
2381        assert_eq!(
2382            root_buf[0],
2383            PageType::InteriorNode as u8,
2384            "expected a multi-leaf table to have an interior root, got tag {}",
2385            root_buf[0]
2386        );
2387
2388        cleanup(&path);
2389    }
2390
2391    #[test]
2392    fn explicit_index_persists_across_save_and_open() {
2393        let path = tmp_path("idx_persist");
2394        let mut db = Database::new("idx".to_string());
2395        process_command(
2396            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2397            &mut db,
2398        )
2399        .unwrap();
2400        for i in 1..=5 {
2401            let tag = if i % 2 == 0 { "odd" } else { "even" };
2402            process_command(
2403                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
2404                &mut db,
2405            )
2406            .unwrap();
2407        }
2408        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
2409        save_database(&mut db, &path).unwrap();
2410
2411        let loaded = open_database(&path, "idx".to_string()).unwrap();
2412        let users = loaded.get_table("users".to_string()).unwrap();
2413        let idx = users
2414            .index_by_name("users_tag_idx")
2415            .expect("explicit index should survive save/open");
2416        assert_eq!(idx.column_name, "tag");
2417        assert!(!idx.is_unique);
2418        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2419        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2420        let even_rowids = idx.lookup(&Value::Text("even".into()));
2421        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
2422        assert_eq!(even_rowids.len(), 3);
2423        assert_eq!(odd_rowids.len(), 2);
2424
2425        cleanup(&path);
2426    }
2427
2428    #[test]
2429    fn auto_indexes_for_unique_columns_survive_save_open() {
2430        let path = tmp_path("auto_idx_persist");
2431        let mut db = Database::new("a".to_string());
2432        process_command(
2433            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2434            &mut db,
2435        )
2436        .unwrap();
2437        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
2438        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
2439        save_database(&mut db, &path).unwrap();
2440
2441        let loaded = open_database(&path, "a".to_string()).unwrap();
2442        let users = loaded.get_table("users".to_string()).unwrap();
2443        // Every UNIQUE column auto-creates an index; the load path populated
2444        // it from the persisted entries.
2445        let auto_name = SecondaryIndex::auto_name("users", "email");
2446        let idx = users
2447            .index_by_name(&auto_name)
2448            .expect("auto index should be restored");
2449        assert!(idx.is_unique);
2450        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
2451        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
2452
2453        cleanup(&path);
2454    }
2455
2456    #[test]
2457    fn deep_tree_round_trips() {
2458        // Force a 3-level tree by bypassing process_command (which prints
2459        // the full table on every INSERT, making large bulk loads O(N^2)
2460        // in I/O). We build the Table directly via restore_row.
2461        use crate::sql::db::table::Column as TableColumn;
2462
2463        let path = tmp_path("deep_tree");
2464        let mut db = Database::new("deep".to_string());
2465        let columns = vec![
2466            TableColumn::new("id".into(), "integer".into(), true, true, true),
2467            TableColumn::new("s".into(), "text".into(), false, true, false),
2468        ];
2469        let mut table = build_empty_table("t", columns, 0);
2470        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
2471        // which with interior fanout ~400 needs 2 interior levels (3-level
2472        // tree total, counting leaves).
2473        for i in 1..=6_000i64 {
2474            let body = "q".repeat(900);
2475            table
2476                .restore_row(
2477                    i,
2478                    vec![
2479                        Some(Value::Integer(i)),
2480                        Some(Value::Text(format!("r-{i}-{body}"))),
2481                    ],
2482                )
2483                .unwrap();
2484        }
2485        db.tables.insert("t".to_string(), table);
2486        save_database(&mut db, &path).unwrap();
2487
2488        let loaded = open_database(&path, "deep".to_string()).unwrap();
2489        let t = loaded.get_table("t".to_string()).unwrap();
2490        assert_eq!(t.rowids().len(), 6_000);
2491
2492        // Confirm the tree actually grew past 2 levels — i.e., the root's
2493        // leftmost child is itself an interior page, not a leaf.
2494        let pager = loaded.pager.as_ref().unwrap();
2495        let mut master = build_empty_master_table();
2496        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2497        let t_root = master
2498            .rowids()
2499            .into_iter()
2500            .find_map(|r| match master.get_value("name", r) {
2501                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
2502                    Some(Value::Integer(p)) => Some(p as u32),
2503                    _ => None,
2504                },
2505                _ => None,
2506            })
2507            .expect("t in sqlrite_master");
2508        let root_buf = pager.read_page(t_root).unwrap();
2509        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
2510        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
2511            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
2512        let root_interior = InteriorPage::from_bytes(root_payload);
2513        let child = root_interior.leftmost_child().unwrap();
2514        let child_buf = pager.read_page(child).unwrap();
2515        assert_eq!(
2516            child_buf[0],
2517            PageType::InteriorNode as u8,
2518            "expected 3-level tree: root's leftmost child should also be InteriorNode",
2519        );
2520
2521        cleanup(&path);
2522    }
2523}