Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod cell;
34pub mod file;
35pub mod header;
36#[allow(dead_code)]
37pub mod hnsw_cell;
38#[allow(dead_code)]
39pub mod index_cell;
40#[allow(dead_code)]
41pub mod interior_page;
42pub mod overflow;
43pub mod page;
44pub mod pager;
45#[allow(dead_code)]
46pub mod table_page;
47#[allow(dead_code)]
48pub mod varint;
49#[allow(dead_code)]
50pub mod wal;
51
52use std::collections::{BTreeMap, HashMap};
53use std::path::Path;
54use std::sync::{Arc, Mutex};
55
56use sqlparser::dialect::SQLiteDialect;
57use sqlparser::parser::Parser;
58
59use crate::error::{Result, SQLRiteError};
60use crate::sql::db::database::Database;
61use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
62use crate::sql::db::table::{Column, DataType, Row, Table, Value};
63use crate::sql::pager::cell::Cell;
64use crate::sql::pager::header::DbHeader;
65use crate::sql::pager::index_cell::IndexCell;
66use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
67use crate::sql::pager::overflow::{
68    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
69};
70use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
71use crate::sql::pager::pager::Pager;
72use crate::sql::pager::table_page::TablePage;
73use crate::sql::parser::create::CreateQuery;
74
75// Re-export so callers can spell `sql::pager::AccessMode` without
76// reaching into the `pager::pager::pager` submodule path.
77pub use crate::sql::pager::pager::AccessMode;
78
79/// Name of the internal catalog table. Reserved — user CREATEs of this
80/// name must be rejected upstream.
81pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
82
83/// Opens a database file in read-write mode. Shorthand for
84/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
85pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
86    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
87}
88
89/// Opens a database file in read-only mode. Acquires a shared OS-level
90/// advisory lock, so other read-only openers coexist but any writer is
91/// excluded. Attempts to mutate the returned `Database` (e.g. an
92/// `INSERT`, or a `save_database` call against it) bottom out in a
93/// `cannot commit: database is opened read-only` error from the Pager.
94pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
95    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
96}
97
98/// Opens a database file and reconstructs the in-memory `Database`,
99/// leaving the long-lived `Pager` attached for subsequent auto-save
100/// (read-write) or consistent-snapshot reads (read-only).
101pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
102    let pager = Pager::open_with_mode(path, mode)?;
103
104    // 1. Load sqlrite_master from the tree at header.schema_root_page.
105    let mut master = build_empty_master_table();
106    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
107
108    // 2. Two passes over master rows: first build every user table, then
109    //    attach secondary indexes. Indexes need their base table to exist
110    //    before we can populate them. Auto-indexes are created at table
111    //    build time so we only have to load explicit indexes from disk
112    //    (but we also reload the auto-index CONTENT because Table::new
113    //    built it empty).
114    let mut db = Database::new(db_name);
115    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
116
117    for rowid in master.rowids() {
118        let ty = take_text(&master, "type", rowid)?;
119        let name = take_text(&master, "name", rowid)?;
120        let sql = take_text(&master, "sql", rowid)?;
121        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
122        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
123
124        match ty.as_str() {
125            "table" => {
126                let (parsed_name, columns) = parse_create_sql(&sql)?;
127                if parsed_name != name {
128                    return Err(SQLRiteError::Internal(format!(
129                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
130                    )));
131                }
132                let mut table = build_empty_table(&name, columns, last_rowid);
133                if rootpage != 0 {
134                    load_table_rows(&pager, &mut table, rootpage)?;
135                }
136                if last_rowid > table.last_rowid {
137                    table.last_rowid = last_rowid;
138                }
139                db.tables.insert(name, table);
140            }
141            "index" => {
142                index_rows.push(IndexCatalogRow {
143                    name,
144                    sql,
145                    rootpage,
146                });
147            }
148            other => {
149                return Err(SQLRiteError::Internal(format!(
150                    "sqlrite_master row '{name}' has unknown type '{other}'"
151                )));
152            }
153        }
154    }
155
156    // Second pass: attach each index to its table. HNSW indexes
157    // (Phase 7d.2) take a different code path because their persisted
158    // form is just the CREATE INDEX SQL — the graph itself isn't
159    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
160    // and route to a graph-rebuild instead of the B-Tree-cell load.
161    for row in index_rows {
162        if create_index_sql_uses_hnsw(&row.sql) {
163            rebuild_hnsw_index(&mut db, &pager, &row)?;
164        } else {
165            attach_index(&mut db, &pager, row)?;
166        }
167    }
168
169    db.source_path = Some(path.to_path_buf());
170    db.pager = Some(pager);
171    Ok(db)
172}
173
174/// Catalog row for a secondary index — deferred until after every table is
175/// loaded so the index's base table exists by the time we populate it.
176struct IndexCatalogRow {
177    name: String,
178    sql: String,
179    rootpage: u32,
180}
181
182/// Persists `db` to disk. Same diff-commit behavior as before: only pages
183/// whose bytes actually changed get written.
184pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
185    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
186    // marked dirty. Done up front under the &mut Database borrow we
187    // already hold, before the immutable iteration loops below need
188    // their own borrow.
189    rebuild_dirty_hnsw_indexes(db);
190
191    let same_path = db.source_path.as_deref() == Some(path);
192    let mut pager = if same_path {
193        match db.pager.take() {
194            Some(p) => p,
195            None if path.exists() => Pager::open(path)?,
196            None => Pager::create(path)?,
197        }
198    } else if path.exists() {
199        Pager::open(path)?
200    } else {
201        Pager::create(path)?
202    };
203
204    pager.clear_staged();
205
206    // Page 0 is the header; payload pages start at 1.
207    let mut next_free_page: u32 = 1;
208
209    // 1. Stage each user table's B-Tree, collecting master-row info.
210    //    `kind` is "table" or "index" — master has one row per each.
211    let mut master_rows: Vec<CatalogEntry> = Vec::new();
212
213    let mut table_names: Vec<&String> = db.tables.keys().collect();
214    table_names.sort();
215    for name in table_names {
216        if name == MASTER_TABLE_NAME {
217            return Err(SQLRiteError::Internal(format!(
218                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
219            )));
220        }
221        let table = &db.tables[name];
222        let (rootpage, new_next) = stage_table_btree(&mut pager, table, next_free_page)?;
223        next_free_page = new_next;
224        master_rows.push(CatalogEntry {
225            kind: "table".into(),
226            name: name.clone(),
227            sql: table_to_create_sql(table),
228            rootpage,
229            last_rowid: table.last_rowid,
230        });
231    }
232
233    // 2. Stage each secondary index's B-Tree. Indexes persist in a
234    //    deterministic order: sorted by (owning_table, index_name).
235    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
236    for table in db.tables.values() {
237        for idx in &table.secondary_indexes {
238            index_entries.push((table, idx));
239        }
240    }
241    index_entries
242        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
243    for (_table, idx) in index_entries {
244        let (rootpage, new_next) = stage_index_btree(&mut pager, idx, next_free_page)?;
245        next_free_page = new_next;
246        master_rows.push(CatalogEntry {
247            kind: "index".into(),
248            name: idx.name.clone(),
249            sql: idx.synthesized_sql(),
250            rootpage,
251            last_rowid: 0,
252        });
253    }
254
255    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
256    //     page trees, with the rootpage recorded in sqlrite_master.
257    //     Reopen loads the graph back from cells (fast, exact match)
258    //     instead of rebuilding from rows.
259    //
260    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
261    //     rebuilt from current rows BEFORE staging, so the on-disk
262    //     graph reflects the current row set.
263    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
264    for table in db.tables.values() {
265        for entry in &table.hnsw_indexes {
266            hnsw_entries.push((table, entry));
267        }
268    }
269    hnsw_entries
270        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
271    for (table, entry) in hnsw_entries {
272        let (rootpage, new_next) = stage_hnsw_btree(&mut pager, &entry.index, next_free_page)?;
273        next_free_page = new_next;
274        master_rows.push(CatalogEntry {
275            kind: "index".into(),
276            name: entry.name.clone(),
277            sql: format!(
278                "CREATE INDEX {} ON {} USING hnsw ({})",
279                entry.name, table.tb_name, entry.column_name
280            ),
281            rootpage,
282            last_rowid: 0,
283        });
284    }
285
286    // 3. Build an in-memory sqlrite_master with one row per table or index,
287    //    then stage it via the same tree-build path.
288    let mut master = build_empty_master_table();
289    for (i, entry) in master_rows.into_iter().enumerate() {
290        let rowid = (i as i64) + 1;
291        master.restore_row(
292            rowid,
293            vec![
294                Some(Value::Text(entry.kind)),
295                Some(Value::Text(entry.name)),
296                Some(Value::Text(entry.sql)),
297                Some(Value::Integer(entry.rootpage as i64)),
298                Some(Value::Integer(entry.last_rowid)),
299            ],
300        )?;
301    }
302    let (master_root, master_next) = stage_table_btree(&mut pager, &master, next_free_page)?;
303    next_free_page = master_next;
304
305    pager.commit(DbHeader {
306        page_count: next_free_page,
307        schema_root_page: master_root,
308    })?;
309
310    if same_path {
311        db.pager = Some(pager);
312    }
313    Ok(())
314}
315
316/// Build material for a single row in sqlrite_master.
317struct CatalogEntry {
318    kind: String, // "table" or "index"
319    name: String,
320    sql: String,
321    rootpage: u32,
322    last_rowid: i64,
323}
324
325// -------------------------------------------------------------------------
326// sqlrite_master — hardcoded catalog table schema
327
328fn build_empty_master_table() -> Table {
329    // Phase 3e: `type` is the first column, matching SQLite's convention.
330    // It distinguishes `'table'` rows from `'index'` rows.
331    let columns = vec![
332        Column::new("type".into(), "text".into(), false, true, false),
333        Column::new("name".into(), "text".into(), true, true, true),
334        Column::new("sql".into(), "text".into(), false, true, false),
335        Column::new("rootpage".into(), "integer".into(), false, true, false),
336        Column::new("last_rowid".into(), "integer".into(), false, true, false),
337    ];
338    build_empty_table(MASTER_TABLE_NAME, columns, 0)
339}
340
341/// Reads a required Text column from a known-good catalog row.
342fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
343    match table.get_value(col, rowid) {
344        Some(Value::Text(s)) => Ok(s),
345        other => Err(SQLRiteError::Internal(format!(
346            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
347        ))),
348    }
349}
350
351/// Reads a required Integer column from a known-good catalog row.
352fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
353    match table.get_value(col, rowid) {
354        Some(Value::Integer(v)) => Ok(v),
355        other => Err(SQLRiteError::Internal(format!(
356            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
357        ))),
358    }
359}
360
361// -------------------------------------------------------------------------
362// CREATE-TABLE SQL synthesis and re-parsing
363
364/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
365/// Deterministic: same schema → same SQL, so diffing commits stay stable.
366fn table_to_create_sql(table: &Table) -> String {
367    let mut parts = Vec::with_capacity(table.columns.len());
368    for c in &table.columns {
369        // Render the SQL type literally so the round-trip through
370        // CREATE TABLE re-parsing recreates the same schema. Vector
371        // carries its dimension inline.
372        let ty: String = match &c.datatype {
373            DataType::Integer => "INTEGER".to_string(),
374            DataType::Text => "TEXT".to_string(),
375            DataType::Real => "REAL".to_string(),
376            DataType::Bool => "BOOLEAN".to_string(),
377            DataType::Vector(dim) => format!("VECTOR({dim})"),
378            DataType::Json => "JSON".to_string(),
379            DataType::None | DataType::Invalid => "TEXT".to_string(),
380        };
381        let mut piece = format!("{} {}", c.column_name, ty);
382        if c.is_pk {
383            piece.push_str(" PRIMARY KEY");
384        } else {
385            if c.is_unique {
386                piece.push_str(" UNIQUE");
387            }
388            if c.not_null {
389                piece.push_str(" NOT NULL");
390            }
391        }
392        parts.push(piece);
393    }
394    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
395}
396
397/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
398/// and produces our internal column list. Returns `(table_name, columns)`.
399fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
400    let dialect = SQLiteDialect {};
401    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
402    let stmt = ast.pop().ok_or_else(|| {
403        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
404    })?;
405    let create = CreateQuery::new(&stmt)?;
406    let columns = create
407        .columns
408        .into_iter()
409        .map(|pc| Column::new(pc.name, pc.datatype, pc.is_pk, pc.not_null, pc.is_unique))
410        .collect();
411    Ok((create.table_name, columns))
412}
413
414// -------------------------------------------------------------------------
415// In-memory table (re)construction
416
417/// Builds an empty in-memory `Table` given the declared columns.
418fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
419    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
420    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
421    {
422        let mut map = rows.lock().expect("rows mutex poisoned");
423        for col in &columns {
424            // Mirror the dispatch in `Table::new` so the reconstructed
425            // table has the same shape it'd have if it were built fresh
426            // from SQL. Phase 7a adds the Vector arm — without it,
427            // VECTOR columns silently restore as Row::None and every
428            // restore_row hits a "storage None vs value Some(Vector(...))"
429            // type mismatch.
430            let row = match &col.datatype {
431                DataType::Integer => Row::Integer(BTreeMap::new()),
432                DataType::Text => Row::Text(BTreeMap::new()),
433                DataType::Real => Row::Real(BTreeMap::new()),
434                DataType::Bool => Row::Bool(BTreeMap::new()),
435                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
436                // JSON columns reuse Text storage — see Table::new and
437                // Phase 7e's scope-correction note.
438                DataType::Json => Row::Text(BTreeMap::new()),
439                DataType::None | DataType::Invalid => Row::None,
440            };
441            map.insert(col.column_name.clone(), row);
442
443            // Auto-create UNIQUE/PK indexes so the restored table has the
444            // same shape Table::new would have built from fresh SQL.
445            if (col.is_pk || col.is_unique)
446                && matches!(col.datatype, DataType::Integer | DataType::Text)
447            {
448                if let Ok(idx) = SecondaryIndex::new(
449                    SecondaryIndex::auto_name(name, &col.column_name),
450                    name.to_string(),
451                    col.column_name.clone(),
452                    &col.datatype,
453                    true,
454                    IndexOrigin::Auto,
455                ) {
456                    secondary_indexes.push(idx);
457                }
458            }
459        }
460    }
461
462    let primary_key = columns
463        .iter()
464        .find(|c| c.is_pk)
465        .map(|c| c.column_name.clone())
466        .unwrap_or_else(|| "-1".to_string());
467
468    Table {
469        tb_name: name.to_string(),
470        columns,
471        rows,
472        secondary_indexes,
473        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
474        // executing each `CREATE INDEX … USING hnsw` SQL stored in
475        // `sqlrite_master`. This builder produces the empty shell;
476        // `replay_create_index_for_hnsw` (in this same module) walks
477        // sqlrite_master after every table is loaded and rebuilds the
478        // graph from current row data. Persistence of the graph itself
479        // (avoiding the on-open rebuild cost) is Phase 7d.3.
480        hnsw_indexes: Vec::new(),
481        last_rowid,
482        primary_key,
483    }
484}
485
486// -------------------------------------------------------------------------
487// Leaf-chain read / write
488
489/// Walks a table's B-Tree from `root_page`, following the leftmost-child
490/// chain down to the first leaf, then iterating leaves via their sibling
491/// `next_page` pointers. Every cell is decoded and replayed into `table`.
492///
493/// Open-path note: we eagerly materialize the entire table into `Table`'s
494/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
495/// on demand so queries can stream through the tree without a full upfront
496/// load.
497/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
498/// index on its base table by walking the tree of index cells at
499/// `rootpage`. The base table is expected to already be in `db.tables`.
500fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
501    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
502
503    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
504        SQLRiteError::Internal(format!(
505            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
506            row.name
507        ))
508    })?;
509    let datatype = table
510        .columns
511        .iter()
512        .find(|c| c.column_name == column_name)
513        .map(|c| clone_datatype(&c.datatype))
514        .ok_or_else(|| {
515            SQLRiteError::Internal(format!(
516                "index '{}' references unknown column '{column_name}' on '{table_name}'",
517                row.name
518            ))
519        })?;
520
521    // An auto-index on this column may already exist (built by
522    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
523    // the slot instead of adding a duplicate entry.
524    let existing_slot = table
525        .secondary_indexes
526        .iter()
527        .position(|i| i.name == row.name);
528    let idx = match existing_slot {
529        Some(i) => {
530            // Drain any entries that may have been populated during table
531            // restore_row calls — we're about to repopulate from the
532            // persisted tree.
533            table.secondary_indexes.remove(i)
534        }
535        None => SecondaryIndex::new(
536            row.name.clone(),
537            table_name.clone(),
538            column_name.clone(),
539            &datatype,
540            is_unique,
541            IndexOrigin::Explicit,
542        )?,
543    };
544    let mut idx = idx;
545    // Wipe any stale entries from the auto path so the load is idempotent.
546    let is_unique_flag = idx.is_unique;
547    let origin = idx.origin;
548    idx = SecondaryIndex::new(
549        idx.name,
550        idx.table_name,
551        idx.column_name,
552        &datatype,
553        is_unique_flag,
554        origin,
555    )?;
556
557    // Populate from the index tree's cells.
558    load_index_rows(pager, &mut idx, row.rootpage)?;
559
560    table.secondary_indexes.push(idx);
561    Ok(())
562}
563
564/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
565/// every `(value, rowid)` pair into `idx`.
566fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
567    if root_page == 0 {
568        return Ok(());
569    }
570    let first_leaf = find_leftmost_leaf(pager, root_page)?;
571    let mut current = first_leaf;
572    while current != 0 {
573        let page_buf = pager
574            .read_page(current)
575            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
576        if page_buf[0] != PageType::TableLeaf as u8 {
577            return Err(SQLRiteError::Internal(format!(
578                "page {current} tagged {} but expected TableLeaf (index)",
579                page_buf[0]
580            )));
581        }
582        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
583        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
584            .try_into()
585            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
586        let leaf = TablePage::from_bytes(payload);
587
588        for slot in 0..leaf.slot_count() {
589            // Slots on an index page hold KIND_INDEX cells; decode directly.
590            let offset = leaf.slot_offset_raw(slot)?;
591            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
592            idx.insert(&ic.value, ic.rowid)?;
593        }
594        current = next_leaf;
595    }
596    Ok(())
597}
598
599/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
600/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
601///
602/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
603/// still works; the only shape we accept is single-column indexes.
604fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
605    use sqlparser::ast::{CreateIndex, Expr, Statement};
606
607    let dialect = SQLiteDialect {};
608    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
609    let Some(Statement::CreateIndex(CreateIndex {
610        table_name,
611        columns,
612        unique,
613        ..
614    })) = ast.pop()
615    else {
616        return Err(SQLRiteError::Internal(format!(
617            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
618        )));
619    };
620    if columns.len() != 1 {
621        return Err(SQLRiteError::NotImplemented(
622            "multi-column indexes aren't supported yet".to_string(),
623        ));
624    }
625    let col = match &columns[0].column.expr {
626        Expr::Identifier(ident) => ident.value.clone(),
627        Expr::CompoundIdentifier(parts) => {
628            parts.last().map(|p| p.value.clone()).unwrap_or_default()
629        }
630        other => {
631            return Err(SQLRiteError::Internal(format!(
632                "unsupported indexed column expression: {other:?}"
633            )));
634        }
635    };
636    Ok((table_name.to_string(), col, unique))
637}
638
639/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
640/// Used by the open path to route HNSW indexes to the graph-rebuild path
641/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
642/// don't have a USING clause, so they all return false and continue
643/// taking the existing path.
644fn create_index_sql_uses_hnsw(sql: &str) -> bool {
645    use sqlparser::ast::{CreateIndex, IndexType, Statement};
646
647    let dialect = SQLiteDialect {};
648    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
649        return false;
650    };
651    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
652        return false;
653    };
654    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
655}
656
657/// Loads (or rebuilds) an HNSW index on database open. Two paths:
658///
659///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
660///     as cell-encoded pages. Read every node directly via
661///     `load_hnsw_nodes` and reconstruct the index — fast, zero
662///     algorithm runs, exact bit-for-bit reproduction of what was saved.
663///
664///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
665///     files saved by Phase 7d.2 before persistence landed. Replay the
666///     CREATE INDEX SQL through `execute_create_index`, which walks the
667///     table's current rows and populates a fresh graph. Slower but
668///     correctness-equivalent on the first save with the new code.
669fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
670    use crate::sql::db::table::HnswIndexEntry;
671    use crate::sql::executor::execute_create_index;
672    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
673    use sqlparser::ast::Statement;
674
675    let dialect = SQLiteDialect {};
676    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
677    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
678        return Err(SQLRiteError::Internal(format!(
679            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
680            row.sql
681        )));
682    };
683
684    if row.rootpage == 0 {
685        // Compatibility path — no persisted graph; walk current rows.
686        execute_create_index(&stmt, db)?;
687        return Ok(());
688    }
689
690    // Persistence path — read the cell tree, deserialize.
691    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
692    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
693
694    // Parse the CREATE INDEX to know which table + column to attach to
695    // — same shape as the row-walk path; we just don't execute it.
696    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
697    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
698        SQLRiteError::Internal(format!(
699            "HNSW index '{}' references unknown table '{tbl_name}'",
700            row.name
701        ))
702    })?;
703    table_mut.hnsw_indexes.push(HnswIndexEntry {
704        name: row.name.clone(),
705        column_name: col_name,
706        index,
707        needs_rebuild: false,
708    });
709    Ok(())
710}
711
712/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
713/// page tree at `root_page` and decode each cell as a node. Returns
714/// the (node_id, layers) tuples in slot-order (already ascending by
715/// node_id since they were staged that way). The caller hands them to
716/// `HnswIndex::from_persisted_nodes`.
717fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
718    use crate::sql::pager::hnsw_cell::HnswNodeCell;
719
720    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
721    let first_leaf = find_leftmost_leaf(pager, root_page)?;
722    let mut current = first_leaf;
723    while current != 0 {
724        let page_buf = pager
725            .read_page(current)
726            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
727        if page_buf[0] != PageType::TableLeaf as u8 {
728            return Err(SQLRiteError::Internal(format!(
729                "page {current} tagged {} but expected TableLeaf (HNSW)",
730                page_buf[0]
731            )));
732        }
733        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
734        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
735            .try_into()
736            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
737        let leaf = TablePage::from_bytes(payload);
738        for slot in 0..leaf.slot_count() {
739            let offset = leaf.slot_offset_raw(slot)?;
740            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
741            nodes.push((cell.node_id, cell.layers));
742        }
743        current = next_leaf;
744    }
745    Ok(nodes)
746}
747
748/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
749/// SQL string. Used by the persistence path on open to know where to
750/// attach the loaded graph. Same shape as `parse_create_index_sql` for
751/// regular indexes — only the assertion differs (we don't care about
752/// UNIQUE for HNSW).
753fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
754    use sqlparser::ast::{CreateIndex, Expr, Statement};
755
756    let dialect = SQLiteDialect {};
757    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
758    let Some(Statement::CreateIndex(CreateIndex {
759        table_name,
760        columns,
761        ..
762    })) = ast.pop()
763    else {
764        return Err(SQLRiteError::Internal(format!(
765            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
766        )));
767    };
768    if columns.len() != 1 {
769        return Err(SQLRiteError::NotImplemented(
770            "multi-column HNSW indexes aren't supported yet".to_string(),
771        ));
772    }
773    let col = match &columns[0].column.expr {
774        Expr::Identifier(ident) => ident.value.clone(),
775        Expr::CompoundIdentifier(parts) => {
776            parts.last().map(|p| p.value.clone()).unwrap_or_default()
777        }
778        other => {
779            return Err(SQLRiteError::Internal(format!(
780                "unsupported HNSW indexed column expression: {other:?}"
781            )));
782        }
783    };
784    Ok((table_name.to_string(), col))
785}
786
787/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
788/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
789/// Walks the table's current Vec<f32> column storage and runs the
790/// HNSW algorithm fresh. Called at the top of `save_database` before
791/// any immutable borrows of `db` start.
792///
793/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
794/// small tables, expensive for ≥100k-row tables — matches the
795/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
796/// MVP, more sophisticated incremental delete strategies (soft-delete
797/// + tombstones, neighbor reconnection) are future polish.
798fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
799    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
800
801    for table in db.tables.values_mut() {
802        // Snapshot which (index_name, column) pairs need rebuilding,
803        // before we go grabbing column data — keeps the borrow
804        // structure simple.
805        let dirty: Vec<(String, String)> = table
806            .hnsw_indexes
807            .iter()
808            .filter(|e| e.needs_rebuild)
809            .map(|e| (e.name.clone(), e.column_name.clone()))
810            .collect();
811        if dirty.is_empty() {
812            continue;
813        }
814
815        for (idx_name, col_name) in dirty {
816            // Snapshot every (rowid, vec) for this column.
817            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
818            {
819                let row_data = table.rows.lock().expect("rows mutex poisoned");
820                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
821                    for (id, v) in map.iter() {
822                        vectors.push((*id, v.clone()));
823                    }
824                }
825            }
826            // Pre-build a HashMap for the get_vec closure so we don't
827            // pay O(N) lookup per insert call.
828            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
829                vectors.iter().cloned().collect();
830
831            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
832            // Sort by id so the rebuild is deterministic across runs.
833            vectors.sort_by_key(|(id, _)| *id);
834            for (id, v) in &vectors {
835                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
836            }
837
838            // Replace the entry's index + clear the dirty flag.
839            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
840                entry.index = new_idx;
841                entry.needs_rebuild = false;
842            }
843        }
844    }
845}
846
847/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
848fn clone_datatype(dt: &DataType) -> DataType {
849    match dt {
850        DataType::Integer => DataType::Integer,
851        DataType::Text => DataType::Text,
852        DataType::Real => DataType::Real,
853        DataType::Bool => DataType::Bool,
854        DataType::Vector(dim) => DataType::Vector(*dim),
855        DataType::Json => DataType::Json,
856        DataType::None => DataType::None,
857        DataType::Invalid => DataType::Invalid,
858    }
859}
860
861/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
862/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
863/// `(root_page, next_free_page)`.
864///
865/// The tree's shape matches a regular table's — leaves chained via
866/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
867/// uniformly for index cells (same prefix as local cells), so the
868/// existing slot directory and binary search carry over.
869fn stage_index_btree(
870    pager: &mut Pager,
871    idx: &SecondaryIndex,
872    start_page: u32,
873) -> Result<(u32, u32)> {
874    // Build the leaves.
875    let (leaves, mut next_free_page) = stage_index_leaves(pager, idx, start_page)?;
876    if leaves.len() == 1 {
877        return Ok((leaves[0].0, next_free_page));
878    }
879    let mut level: Vec<(u32, i64)> = leaves;
880    while level.len() > 1 {
881        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
882        next_free_page = new_next_free;
883        level = next_level;
884    }
885    Ok((level[0].0, next_free_page))
886}
887
888/// Packs the index's (value, rowid) entries into a sibling-chained run
889/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
890/// (ascending value; rowids in insertion order within a value), which is
891/// also ascending by the "cell rowid" carried in each IndexCell (the
892/// original row's rowid) — so Cell::peek_rowid + the slot directory's
893/// rowid ordering stays consistent.
894fn stage_index_leaves(
895    pager: &mut Pager,
896    idx: &SecondaryIndex,
897    start_page: u32,
898) -> Result<(Vec<(u32, i64)>, u32)> {
899    let mut leaves: Vec<(u32, i64)> = Vec::new();
900    let mut current_leaf = TablePage::empty();
901    let mut current_leaf_page = start_page;
902    let mut current_max_rowid: Option<i64> = None;
903    let mut next_free_page = start_page + 1;
904
905    // Sort the entries by original rowid so the in-page slot directory,
906    // which binary-searches by rowid, stays valid. (iter_entries orders by
907    // value; we reorder here for B-Tree correctness.)
908    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
909    entries.sort_by_key(|(_, r)| *r);
910
911    for (value, rowid) in entries {
912        let cell = IndexCell::new(rowid, value);
913        let entry_bytes = cell.encode()?;
914
915        if !current_leaf.would_fit(entry_bytes.len()) {
916            let next_leaf_page_num = next_free_page;
917            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
918            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
919            current_leaf = TablePage::empty();
920            current_leaf_page = next_leaf_page_num;
921            next_free_page += 1;
922
923            if !current_leaf.would_fit(entry_bytes.len()) {
924                return Err(SQLRiteError::Internal(format!(
925                    "index entry of {} bytes exceeds empty-page capacity {}",
926                    entry_bytes.len(),
927                    current_leaf.free_space()
928                )));
929            }
930        }
931        current_leaf.insert_entry(rowid, &entry_bytes)?;
932        current_max_rowid = Some(rowid);
933    }
934
935    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
936    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
937    Ok((leaves, next_free_page))
938}
939
940/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
941/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
942/// (node_id, layers). Returns `(root_page, next_free_page)`.
943///
944/// Tree shape is identical to `stage_index_btree` — chained leaves +
945/// optional interior layers. The slot directory binary-searches by
946/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
947/// so reads can locate any node in O(log N) once 7d.4-or-later
948/// optimizes the load path to lazy-fetch instead of read-all.
949/// Today, `load_hnsw_nodes` reads the entire tree on open.
950fn stage_hnsw_btree(
951    pager: &mut Pager,
952    idx: &crate::sql::hnsw::HnswIndex,
953    start_page: u32,
954) -> Result<(u32, u32)> {
955    let (leaves, mut next_free_page) = stage_hnsw_leaves(pager, idx, start_page)?;
956    if leaves.len() == 1 {
957        return Ok((leaves[0].0, next_free_page));
958    }
959    let mut level: Vec<(u32, i64)> = leaves;
960    while level.len() > 1 {
961        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
962        next_free_page = new_next_free;
963        level = next_level;
964    }
965    Ok((level[0].0, next_free_page))
966}
967
968/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
969/// `serialize_nodes` already returns nodes in ascending node_id order,
970/// so the slot directory's rowid ordering stays valid.
971fn stage_hnsw_leaves(
972    pager: &mut Pager,
973    idx: &crate::sql::hnsw::HnswIndex,
974    start_page: u32,
975) -> Result<(Vec<(u32, i64)>, u32)> {
976    use crate::sql::pager::hnsw_cell::HnswNodeCell;
977
978    let mut leaves: Vec<(u32, i64)> = Vec::new();
979    let mut current_leaf = TablePage::empty();
980    let mut current_leaf_page = start_page;
981    let mut current_max_rowid: Option<i64> = None;
982    let mut next_free_page = start_page + 1;
983
984    let serialized = idx.serialize_nodes();
985
986    // Empty index → emit a single empty leaf page so the rootpage
987    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
988    // it just happens to be empty"). load_hnsw_nodes is fine with an
989    // empty leaf — slot_count() returns 0.
990    for (node_id, layers) in serialized {
991        let cell = HnswNodeCell::new(node_id, layers);
992        let entry_bytes = cell.encode()?;
993
994        if !current_leaf.would_fit(entry_bytes.len()) {
995            let next_leaf_page_num = next_free_page;
996            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
997            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
998            current_leaf = TablePage::empty();
999            current_leaf_page = next_leaf_page_num;
1000            next_free_page += 1;
1001
1002            if !current_leaf.would_fit(entry_bytes.len()) {
1003                return Err(SQLRiteError::Internal(format!(
1004                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1005                    entry_bytes.len(),
1006                    current_leaf.free_space()
1007                )));
1008            }
1009        }
1010        current_leaf.insert_entry(node_id, &entry_bytes)?;
1011        current_max_rowid = Some(node_id);
1012    }
1013
1014    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1015    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1016    Ok((leaves, next_free_page))
1017}
1018
1019fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
1020    let first_leaf = find_leftmost_leaf(pager, root_page)?;
1021    let mut current = first_leaf;
1022    while current != 0 {
1023        let page_buf = pager
1024            .read_page(current)
1025            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
1026        if page_buf[0] != PageType::TableLeaf as u8 {
1027            return Err(SQLRiteError::Internal(format!(
1028                "page {current} tagged {} but expected TableLeaf",
1029                page_buf[0]
1030            )));
1031        }
1032        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
1033        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
1034            .try_into()
1035            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
1036        let leaf = TablePage::from_bytes(payload);
1037
1038        for slot in 0..leaf.slot_count() {
1039            let entry = leaf.entry_at(slot)?;
1040            let cell = match entry {
1041                PagedEntry::Local(c) => c,
1042                PagedEntry::Overflow(r) => {
1043                    let body_bytes =
1044                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1045                    let (c, _) = Cell::decode(&body_bytes, 0)?;
1046                    c
1047                }
1048            };
1049            table.restore_row(cell.rowid, cell.values)?;
1050        }
1051        current = next_leaf;
1052    }
1053    Ok(())
1054}
1055
1056/// Descends from `root_page` through `InteriorNode` pages, always taking
1057/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1058/// page number. A root that's already a leaf is returned as-is.
1059fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
1060    let mut current = root_page;
1061    loop {
1062        let page_buf = pager.read_page(current).ok_or_else(|| {
1063            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
1064        })?;
1065        match page_buf[0] {
1066            t if t == PageType::TableLeaf as u8 => return Ok(current),
1067            t if t == PageType::InteriorNode as u8 => {
1068                let payload: &[u8; PAYLOAD_PER_PAGE] =
1069                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1070                        SQLRiteError::Internal("interior payload slice size".to_string())
1071                    })?;
1072                let interior = InteriorPage::from_bytes(payload);
1073                current = interior.leftmost_child()?;
1074            }
1075            other => {
1076                return Err(SQLRiteError::Internal(format!(
1077                    "unexpected page type {other} during tree descent at page {current}"
1078                )));
1079            }
1080        }
1081    }
1082}
1083
1084/// Stages a table's B-Tree starting at `start_page`. Returns
1085/// `(root_page, next_free_page)`. Builds bottom-up:
1086///
1087/// 1. Pack all row cells into `TableLeaf` pages, chaining them via each
1088///    leaf's `next_page` sibling pointer (for fast sequential scans).
1089/// 2. If the table fits in a single leaf, that leaf is the root.
1090/// 3. Otherwise, group leaves into `InteriorNode` pages; recurse up the
1091///    tree until one root remains.
1092///
1093/// Deterministic: same in-memory rows → same pages at same offsets, so
1094/// the Pager's diff commit still skips unchanged tables.
1095fn stage_table_btree(pager: &mut Pager, table: &Table, start_page: u32) -> Result<(u32, u32)> {
1096    let (leaves, mut next_free_page) = stage_leaves(pager, table, start_page)?;
1097    if leaves.len() == 1 {
1098        return Ok((leaves[0].0, next_free_page));
1099    }
1100    let mut level: Vec<(u32, i64)> = leaves;
1101    while level.len() > 1 {
1102        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
1103        next_free_page = new_next_free;
1104        level = next_level;
1105    }
1106    Ok((level[0].0, next_free_page))
1107}
1108
1109/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1110/// Returns each leaf's `(page_number, max_rowid)` (used by the next level
1111/// up to build divider cells) and the first free page after the chain
1112/// including any overflow pages allocated for oversized cells.
1113fn stage_leaves(
1114    pager: &mut Pager,
1115    table: &Table,
1116    start_page: u32,
1117) -> Result<(Vec<(u32, i64)>, u32)> {
1118    let mut leaves: Vec<(u32, i64)> = Vec::new();
1119    let mut current_leaf = TablePage::empty();
1120    let mut current_leaf_page = start_page;
1121    let mut current_max_rowid: Option<i64> = None;
1122    let mut next_free_page = start_page + 1;
1123
1124    for rowid in table.rowids() {
1125        let entry_bytes = build_row_entry(pager, table, rowid, &mut next_free_page)?;
1126
1127        if !current_leaf.would_fit(entry_bytes.len()) {
1128            // Commit the current leaf. Its sibling next_page is the page
1129            // number where the new leaf will go — which is next_free_page
1130            // right now (no overflow pages have been allocated between
1131            // this decision and the new leaf's allocation below).
1132            let next_leaf_page_num = next_free_page;
1133            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1134            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1135            current_leaf = TablePage::empty();
1136            current_leaf_page = next_leaf_page_num;
1137            next_free_page += 1;
1138            // current_max_rowid is reassigned by the insert below; no need
1139            // to zero it out here.
1140
1141            if !current_leaf.would_fit(entry_bytes.len()) {
1142                return Err(SQLRiteError::Internal(format!(
1143                    "entry of {} bytes exceeds empty-page capacity {}",
1144                    entry_bytes.len(),
1145                    current_leaf.free_space()
1146                )));
1147            }
1148        }
1149        current_leaf.insert_entry(rowid, &entry_bytes)?;
1150        current_max_rowid = Some(rowid);
1151    }
1152
1153    // Final leaf: sibling next_page = 0 (end of chain).
1154    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1155    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1156    Ok((leaves, next_free_page))
1157}
1158
1159/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1160/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1161/// encoded cell exceeded the inline threshold. Advances `next_free_page`
1162/// past any overflow pages used.
1163fn build_row_entry(
1164    pager: &mut Pager,
1165    table: &Table,
1166    rowid: i64,
1167    next_free_page: &mut u32,
1168) -> Result<Vec<u8>> {
1169    let values = table.extract_row(rowid);
1170    let local_cell = Cell::new(rowid, values);
1171    let local_bytes = local_cell.encode()?;
1172    if local_bytes.len() > OVERFLOW_THRESHOLD {
1173        let overflow_start = *next_free_page;
1174        *next_free_page = write_overflow_chain(pager, &local_bytes, overflow_start)?;
1175        Ok(OverflowRef {
1176            rowid,
1177            total_body_len: local_bytes.len() as u64,
1178            first_overflow_page: overflow_start,
1179        }
1180        .encode())
1181    } else {
1182        Ok(local_bytes)
1183    }
1184}
1185
1186/// Builds one level of `InteriorNode` pages above the given children.
1187/// Each interior packs as many dividers as will fit; the last child
1188/// assigned to an interior becomes its `rightmost_child`. Returns the
1189/// emitted interior pages as `(page_number, max_rowid_in_subtree)` so the
1190/// next level can build on top of them.
1191fn stage_interior_level(
1192    pager: &mut Pager,
1193    children: &[(u32, i64)],
1194    start_page: u32,
1195) -> Result<(Vec<(u32, i64)>, u32)> {
1196    let mut next_level: Vec<(u32, i64)> = Vec::new();
1197    let mut next_free_page = start_page;
1198    let mut idx = 0usize;
1199
1200    while idx < children.len() {
1201        let interior_page_num = next_free_page;
1202        next_free_page += 1;
1203
1204        // Seed the interior with the first unassigned child as its
1205        // rightmost. As we add more children, the previous rightmost
1206        // graduates to being a divider and the new arrival takes over
1207        // as rightmost.
1208        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1209        idx += 1;
1210        let mut interior = InteriorPage::empty(rightmost_child_page);
1211
1212        while idx < children.len() {
1213            let new_divider_cell = InteriorCell {
1214                divider_rowid: rightmost_child_max,
1215                child_page: rightmost_child_page,
1216            };
1217            let new_divider_bytes = new_divider_cell.encode();
1218            if !interior.would_fit(new_divider_bytes.len()) {
1219                break;
1220            }
1221            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
1222            let (next_child_page, next_child_max) = children[idx];
1223            interior.set_rightmost_child(next_child_page);
1224            rightmost_child_page = next_child_page;
1225            rightmost_child_max = next_child_max;
1226            idx += 1;
1227        }
1228
1229        emit_interior(pager, interior_page_num, &interior);
1230        next_level.push((interior_page_num, rightmost_child_max));
1231    }
1232
1233    Ok((next_level, next_free_page))
1234}
1235
1236/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1237fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
1238    let mut buf = [0u8; PAGE_SIZE];
1239    buf[0] = PageType::TableLeaf as u8;
1240    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
1241    // For leaf pages the legacy `payload_len` field isn't used — the slot
1242    // directory self-describes. Zero it by convention.
1243    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1244    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
1245    pager.stage_page(page_num, buf);
1246}
1247
1248/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1249/// don't use `next_page` (there's no sibling chain between interiors);
1250/// `payload_len` is also unused (the slot directory self-describes).
1251fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1252    let mut buf = [0u8; PAGE_SIZE];
1253    buf[0] = PageType::InteriorNode as u8;
1254    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1255    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1256    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1257    pager.stage_page(page_num, buf);
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263    use crate::sql::process_command;
1264
1265    fn seed_db() -> Database {
1266        let mut db = Database::new("test".to_string());
1267        process_command(
1268            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1269            &mut db,
1270        )
1271        .unwrap();
1272        process_command(
1273            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1274            &mut db,
1275        )
1276        .unwrap();
1277        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1278        process_command(
1279            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1280            &mut db,
1281        )
1282        .unwrap();
1283        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1284        db
1285    }
1286
1287    fn tmp_path(name: &str) -> std::path::PathBuf {
1288        let mut p = std::env::temp_dir();
1289        let pid = std::process::id();
1290        let nanos = std::time::SystemTime::now()
1291            .duration_since(std::time::UNIX_EPOCH)
1292            .map(|d| d.as_nanos())
1293            .unwrap_or(0);
1294        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1295        p
1296    }
1297
1298    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1299    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1300    fn cleanup(path: &std::path::Path) {
1301        let _ = std::fs::remove_file(path);
1302        let mut wal = path.as_os_str().to_owned();
1303        wal.push("-wal");
1304        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1305    }
1306
1307    #[test]
1308    fn round_trip_preserves_schema_and_data() {
1309        let path = tmp_path("roundtrip");
1310        let mut db = seed_db();
1311        save_database(&mut db, &path).expect("save");
1312
1313        let loaded = open_database(&path, "test".to_string()).expect("open");
1314        assert_eq!(loaded.tables.len(), 2);
1315
1316        let users = loaded.get_table("users".to_string()).expect("users table");
1317        assert_eq!(users.columns.len(), 3);
1318        let rowids = users.rowids();
1319        assert_eq!(rowids.len(), 2);
1320        let names: Vec<String> = rowids
1321            .iter()
1322            .filter_map(|r| match users.get_value("name", *r) {
1323                Some(Value::Text(s)) => Some(s),
1324                _ => None,
1325            })
1326            .collect();
1327        assert!(names.contains(&"alice".to_string()));
1328        assert!(names.contains(&"bob".to_string()));
1329
1330        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1331        assert_eq!(notes.rowids().len(), 1);
1332
1333        cleanup(&path);
1334    }
1335
1336    // -----------------------------------------------------------------
1337    // Phase 7a — VECTOR(N) save / reopen round-trip
1338    // -----------------------------------------------------------------
1339
1340    #[test]
1341    fn round_trip_preserves_vector_column() {
1342        let path = tmp_path("vec_roundtrip");
1343
1344        // Build, populate, save.
1345        {
1346            let mut db = Database::new("test".to_string());
1347            process_command(
1348                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1349                &mut db,
1350            )
1351            .unwrap();
1352            process_command(
1353                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1354                &mut db,
1355            )
1356            .unwrap();
1357            process_command(
1358                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1359                &mut db,
1360            )
1361            .unwrap();
1362            save_database(&mut db, &path).expect("save");
1363        } // db drops → its exclusive lock releases before reopen.
1364
1365        // Reopen and verify schema + data both round-tripped.
1366        let loaded = open_database(&path, "test".to_string()).expect("open");
1367        let docs = loaded.get_table("docs".to_string()).expect("docs table");
1368
1369        // Schema preserved: column is still VECTOR(3).
1370        let embedding_col = docs
1371            .columns
1372            .iter()
1373            .find(|c| c.column_name == "embedding")
1374            .expect("embedding column");
1375        assert!(
1376            matches!(embedding_col.datatype, DataType::Vector(3)),
1377            "expected DataType::Vector(3) after round-trip, got {:?}",
1378            embedding_col.datatype
1379        );
1380
1381        // Data preserved: both vectors still readable bit-for-bit.
1382        let mut rows: Vec<Vec<f32>> = docs
1383            .rowids()
1384            .iter()
1385            .filter_map(|r| match docs.get_value("embedding", *r) {
1386                Some(Value::Vector(v)) => Some(v),
1387                _ => None,
1388            })
1389            .collect();
1390        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
1391        assert_eq!(rows.len(), 2);
1392        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1393        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1394
1395        cleanup(&path);
1396    }
1397
1398    #[test]
1399    fn round_trip_preserves_json_column() {
1400        // Phase 7e — JSON columns are stored as Text under the hood with
1401        // INSERT-time validation. Save + reopen should preserve the
1402        // schema (DataType::Json) and the underlying text bytes; a
1403        // post-reopen json_extract should still resolve paths correctly.
1404        let path = tmp_path("json_roundtrip");
1405
1406        {
1407            let mut db = Database::new("test".to_string());
1408            process_command(
1409                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
1410                &mut db,
1411            )
1412            .unwrap();
1413            process_command(
1414                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
1415                &mut db,
1416            )
1417            .unwrap();
1418            save_database(&mut db, &path).expect("save");
1419        }
1420
1421        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1422        let docs = loaded.get_table("docs".to_string()).expect("docs");
1423
1424        // Schema: column declared as JSON, restored with the same type.
1425        let payload_col = docs
1426            .columns
1427            .iter()
1428            .find(|c| c.column_name == "payload")
1429            .unwrap();
1430        assert!(
1431            matches!(payload_col.datatype, DataType::Json),
1432            "expected DataType::Json, got {:?}",
1433            payload_col.datatype
1434        );
1435
1436        // json_extract works against the reopened data — exercises the
1437        // full Text-storage + serde_json::from_str path post-reopen.
1438        let resp = process_command(
1439            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
1440            &mut loaded,
1441        )
1442        .expect("select via json_extract after reopen");
1443        assert!(resp.contains("1 row returned"), "got: {resp}");
1444
1445        cleanup(&path);
1446    }
1447
1448    #[test]
1449    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
1450        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
1451        // pages. After save+reopen the index entry reattaches with the
1452        // same column + same node count, loaded directly from disk
1453        // instead of re-walking rows.
1454        let path = tmp_path("hnsw_roundtrip");
1455
1456        // Build, populate, index, save.
1457        {
1458            let mut db = Database::new("test".to_string());
1459            process_command(
1460                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
1461                &mut db,
1462            )
1463            .unwrap();
1464            for v in &[
1465                "[1.0, 0.0]",
1466                "[2.0, 0.0]",
1467                "[0.0, 3.0]",
1468                "[1.0, 4.0]",
1469                "[10.0, 10.0]",
1470            ] {
1471                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
1472            }
1473            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1474            save_database(&mut db, &path).expect("save");
1475        } // db drops → exclusive lock releases.
1476
1477        // Reopen and verify the index reattached, with the same name +
1478        // column + populated graph.
1479        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1480        {
1481            let table = loaded.get_table("docs".to_string()).expect("docs");
1482            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1483            let entry = &table.hnsw_indexes[0];
1484            assert_eq!(entry.name, "ix_e");
1485            assert_eq!(entry.column_name, "e");
1486            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
1487            assert!(
1488                !entry.needs_rebuild,
1489                "fresh load should not be marked dirty"
1490            );
1491        }
1492
1493        // Quick functional check: KNN query through the loaded index
1494        // returns results.
1495        let resp = process_command(
1496            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
1497            &mut loaded,
1498        )
1499        .unwrap();
1500        assert!(resp.contains("3 rows returned"), "got: {resp}");
1501
1502        cleanup(&path);
1503    }
1504
1505    #[test]
1506    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
1507        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
1508        // current rows + serializes; reopen loads the post-delete graph.
1509        // After all that, the deleted rowid must NOT come back from a
1510        // KNN query.
1511        let path = tmp_path("hnsw_delete_rebuild");
1512        let mut db = Database::new("test".to_string());
1513        process_command(
1514            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
1515            &mut db,
1516        )
1517        .unwrap();
1518        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
1519            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
1520        }
1521        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1522
1523        // Delete row 1 (the closest match to [0.5, 0.0]).
1524        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1525        // Confirm it marked dirty.
1526        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1527        assert!(dirty_before_save, "DELETE should mark dirty");
1528
1529        save_database(&mut db, &path).expect("save");
1530        // Confirm save cleared the dirty flag.
1531        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1532        assert!(!dirty_after_save, "save should clear dirty");
1533        drop(db);
1534
1535        // Reopen, query for the closest match. Row 1 is gone; row 2
1536        // (id=2, vector [2.0, 0.0]) should now be the nearest.
1537        let loaded = open_database(&path, "test".to_string()).expect("open");
1538        let docs = loaded.get_table("docs".to_string()).expect("docs");
1539
1540        // Row 1 must not appear in any storage anymore.
1541        assert!(
1542            !docs.rowids().contains(&1),
1543            "deleted row 1 should not be in row storage"
1544        );
1545        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
1546
1547        // The HNSW index must also have shed the deleted node.
1548        assert_eq!(
1549            docs.hnsw_indexes[0].index.len(),
1550            3,
1551            "HNSW graph should have shed the deleted node"
1552        );
1553
1554        cleanup(&path);
1555    }
1556
1557    #[test]
1558    fn round_trip_survives_writes_after_load() {
1559        let path = tmp_path("after_load");
1560        save_database(&mut seed_db(), &path).unwrap();
1561
1562        {
1563            let mut db = open_database(&path, "test".to_string()).unwrap();
1564            process_command(
1565                "INSERT INTO users (name, age) VALUES ('carol', 40);",
1566                &mut db,
1567            )
1568            .unwrap();
1569            save_database(&mut db, &path).unwrap();
1570        } // db drops → its exclusive lock releases before we reopen below.
1571
1572        let db2 = open_database(&path, "test".to_string()).unwrap();
1573        let users = db2.get_table("users".to_string()).unwrap();
1574        assert_eq!(users.rowids().len(), 3);
1575
1576        cleanup(&path);
1577    }
1578
1579    #[test]
1580    fn open_rejects_garbage_file() {
1581        let path = tmp_path("bad");
1582        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
1583        let result = open_database(&path, "x".to_string());
1584        assert!(result.is_err());
1585        cleanup(&path);
1586    }
1587
1588    #[test]
1589    fn many_small_rows_spread_across_leaves() {
1590        let path = tmp_path("many_rows");
1591        let mut db = Database::new("big".to_string());
1592        process_command(
1593            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1594            &mut db,
1595        )
1596        .unwrap();
1597        for i in 0..200 {
1598            let body = "x".repeat(200);
1599            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1600            process_command(&q, &mut db).unwrap();
1601        }
1602        save_database(&mut db, &path).unwrap();
1603        let loaded = open_database(&path, "big".to_string()).unwrap();
1604        let things = loaded.get_table("things".to_string()).unwrap();
1605        assert_eq!(things.rowids().len(), 200);
1606        cleanup(&path);
1607    }
1608
1609    #[test]
1610    fn huge_row_goes_through_overflow() {
1611        let path = tmp_path("overflow_row");
1612        let mut db = Database::new("big".to_string());
1613        process_command(
1614            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1615            &mut db,
1616        )
1617        .unwrap();
1618        let body = "A".repeat(10_000);
1619        process_command(
1620            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1621            &mut db,
1622        )
1623        .unwrap();
1624        save_database(&mut db, &path).unwrap();
1625
1626        let loaded = open_database(&path, "big".to_string()).unwrap();
1627        let docs = loaded.get_table("docs".to_string()).unwrap();
1628        let rowids = docs.rowids();
1629        assert_eq!(rowids.len(), 1);
1630        let stored = docs.get_value("body", rowids[0]);
1631        match stored {
1632            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
1633            other => panic!("expected Text, got {other:?}"),
1634        }
1635        cleanup(&path);
1636    }
1637
1638    #[test]
1639    fn create_sql_synthesis_round_trips() {
1640        // Build a table via CREATE, then verify table_to_create_sql +
1641        // parse_create_sql reproduce an equivalent column list.
1642        let mut db = Database::new("x".to_string());
1643        process_command(
1644            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
1645            &mut db,
1646        )
1647        .unwrap();
1648        let t = db.get_table("t".to_string()).unwrap();
1649        let sql = table_to_create_sql(t);
1650        let (name, cols) = parse_create_sql(&sql).unwrap();
1651        assert_eq!(name, "t");
1652        assert_eq!(cols.len(), 3);
1653        assert!(cols[0].is_pk);
1654        assert!(cols[1].is_unique);
1655        assert!(cols[2].not_null);
1656    }
1657
1658    #[test]
1659    fn sqlrite_master_is_not_exposed_as_a_user_table() {
1660        // After open, the public db.tables map should not list the master.
1661        let path = tmp_path("no_master");
1662        save_database(&mut seed_db(), &path).unwrap();
1663        let loaded = open_database(&path, "x".to_string()).unwrap();
1664        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
1665        cleanup(&path);
1666    }
1667
1668    #[test]
1669    fn multi_leaf_table_produces_an_interior_root() {
1670        // 200 fat rows force the table into multiple leaves, which means
1671        // save_database must build at least one InteriorNode above them.
1672        // The test verifies the round-trip works and confirms the root is
1673        // indeed an interior page (not a leaf) by reading the page type
1674        // directly out of the open pager.
1675        let path = tmp_path("multi_leaf_interior");
1676        let mut db = Database::new("big".to_string());
1677        process_command(
1678            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1679            &mut db,
1680        )
1681        .unwrap();
1682        for i in 0..200 {
1683            let body = "x".repeat(200);
1684            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1685            process_command(&q, &mut db).unwrap();
1686        }
1687        save_database(&mut db, &path).unwrap();
1688
1689        // Confirm the round-trip preserved all 200 rows.
1690        let loaded = open_database(&path, "big".to_string()).unwrap();
1691        let things = loaded.get_table("things".to_string()).unwrap();
1692        assert_eq!(things.rowids().len(), 200);
1693
1694        // Peek at `things`'s root page via the pager attached to the
1695        // loaded DB and check it's an InteriorNode, not a leaf.
1696        let pager = loaded
1697            .pager
1698            .as_ref()
1699            .expect("loaded DB should have a pager");
1700        // sqlrite_master's row for `things` holds its root page. Easiest
1701        // way to find it: walk the leaf chain by using find_leftmost_leaf
1702        // and then hop one level up. Simpler: read the master, scan for
1703        // the "things" row, look up rootpage.
1704        let mut master = build_empty_master_table();
1705        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1706        let things_root = master
1707            .rowids()
1708            .into_iter()
1709            .find_map(|r| match master.get_value("name", r) {
1710                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
1711                    Some(Value::Integer(p)) => Some(p as u32),
1712                    _ => None,
1713                },
1714                _ => None,
1715            })
1716            .expect("things should appear in sqlrite_master");
1717        let root_buf = pager.read_page(things_root).unwrap();
1718        assert_eq!(
1719            root_buf[0],
1720            PageType::InteriorNode as u8,
1721            "expected a multi-leaf table to have an interior root, got tag {}",
1722            root_buf[0]
1723        );
1724
1725        cleanup(&path);
1726    }
1727
1728    #[test]
1729    fn explicit_index_persists_across_save_and_open() {
1730        let path = tmp_path("idx_persist");
1731        let mut db = Database::new("idx".to_string());
1732        process_command(
1733            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
1734            &mut db,
1735        )
1736        .unwrap();
1737        for i in 1..=5 {
1738            let tag = if i % 2 == 0 { "odd" } else { "even" };
1739            process_command(
1740                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
1741                &mut db,
1742            )
1743            .unwrap();
1744        }
1745        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
1746        save_database(&mut db, &path).unwrap();
1747
1748        let loaded = open_database(&path, "idx".to_string()).unwrap();
1749        let users = loaded.get_table("users".to_string()).unwrap();
1750        let idx = users
1751            .index_by_name("users_tag_idx")
1752            .expect("explicit index should survive save/open");
1753        assert_eq!(idx.column_name, "tag");
1754        assert!(!idx.is_unique);
1755        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
1756        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
1757        let even_rowids = idx.lookup(&Value::Text("even".into()));
1758        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
1759        assert_eq!(even_rowids.len(), 3);
1760        assert_eq!(odd_rowids.len(), 2);
1761
1762        cleanup(&path);
1763    }
1764
1765    #[test]
1766    fn auto_indexes_for_unique_columns_survive_save_open() {
1767        let path = tmp_path("auto_idx_persist");
1768        let mut db = Database::new("a".to_string());
1769        process_command(
1770            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
1771            &mut db,
1772        )
1773        .unwrap();
1774        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
1775        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
1776        save_database(&mut db, &path).unwrap();
1777
1778        let loaded = open_database(&path, "a".to_string()).unwrap();
1779        let users = loaded.get_table("users".to_string()).unwrap();
1780        // Every UNIQUE column auto-creates an index; the load path populated
1781        // it from the persisted entries.
1782        let auto_name = SecondaryIndex::auto_name("users", "email");
1783        let idx = users
1784            .index_by_name(&auto_name)
1785            .expect("auto index should be restored");
1786        assert!(idx.is_unique);
1787        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
1788        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
1789
1790        cleanup(&path);
1791    }
1792
1793    #[test]
1794    fn deep_tree_round_trips() {
1795        // Force a 3-level tree by bypassing process_command (which prints
1796        // the full table on every INSERT, making large bulk loads O(N^2)
1797        // in I/O). We build the Table directly via restore_row.
1798        use crate::sql::db::table::Column as TableColumn;
1799
1800        let path = tmp_path("deep_tree");
1801        let mut db = Database::new("deep".to_string());
1802        let columns = vec![
1803            TableColumn::new("id".into(), "integer".into(), true, true, true),
1804            TableColumn::new("s".into(), "text".into(), false, true, false),
1805        ];
1806        let mut table = build_empty_table("t", columns, 0);
1807        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
1808        // which with interior fanout ~400 needs 2 interior levels (3-level
1809        // tree total, counting leaves).
1810        for i in 1..=6_000i64 {
1811            let body = "q".repeat(900);
1812            table
1813                .restore_row(
1814                    i,
1815                    vec![
1816                        Some(Value::Integer(i)),
1817                        Some(Value::Text(format!("r-{i}-{body}"))),
1818                    ],
1819                )
1820                .unwrap();
1821        }
1822        db.tables.insert("t".to_string(), table);
1823        save_database(&mut db, &path).unwrap();
1824
1825        let loaded = open_database(&path, "deep".to_string()).unwrap();
1826        let t = loaded.get_table("t".to_string()).unwrap();
1827        assert_eq!(t.rowids().len(), 6_000);
1828
1829        // Confirm the tree actually grew past 2 levels — i.e., the root's
1830        // leftmost child is itself an interior page, not a leaf.
1831        let pager = loaded.pager.as_ref().unwrap();
1832        let mut master = build_empty_master_table();
1833        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1834        let t_root = master
1835            .rowids()
1836            .into_iter()
1837            .find_map(|r| match master.get_value("name", r) {
1838                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
1839                    Some(Value::Integer(p)) => Some(p as u32),
1840                    _ => None,
1841                },
1842                _ => None,
1843            })
1844            .expect("t in sqlrite_master");
1845        let root_buf = pager.read_page(t_root).unwrap();
1846        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
1847        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
1848            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
1849        let root_interior = InteriorPage::from_bytes(root_payload);
1850        let child = root_interior.leftmost_child().unwrap();
1851        let child_buf = pager.read_page(child).unwrap();
1852        assert_eq!(
1853            child_buf[0],
1854            PageType::InteriorNode as u8,
1855            "expected 3-level tree: root's leftmost child should also be InteriorNode",
1856        );
1857
1858        cleanup(&path);
1859    }
1860}