Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod cell;
34pub mod file;
35pub mod header;
36#[allow(dead_code)]
37pub mod index_cell;
38#[allow(dead_code)]
39pub mod interior_page;
40pub mod overflow;
41pub mod page;
42pub mod pager;
43#[allow(dead_code)]
44pub mod table_page;
45#[allow(dead_code)]
46pub mod varint;
47#[allow(dead_code)]
48pub mod wal;
49
50use std::collections::{BTreeMap, HashMap};
51use std::path::Path;
52use std::sync::{Arc, Mutex};
53
54use sqlparser::dialect::SQLiteDialect;
55use sqlparser::parser::Parser;
56
57use crate::error::{Result, SQLRiteError};
58use crate::sql::db::database::Database;
59use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
60use crate::sql::db::table::{Column, DataType, Row, Table, Value};
61use crate::sql::pager::cell::Cell;
62use crate::sql::pager::header::DbHeader;
63use crate::sql::pager::index_cell::IndexCell;
64use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
65use crate::sql::pager::overflow::{
66    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
67};
68use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
69use crate::sql::pager::pager::Pager;
70use crate::sql::pager::table_page::TablePage;
71use crate::sql::parser::create::CreateQuery;
72
73// Re-export so callers can spell `sql::pager::AccessMode` without
74// reaching into the `pager::pager::pager` submodule path.
75pub use crate::sql::pager::pager::AccessMode;
76
77/// Name of the internal catalog table. Reserved — user CREATEs of this
78/// name must be rejected upstream.
79pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
80
81/// Opens a database file in read-write mode. Shorthand for
82/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
83pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
84    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
85}
86
87/// Opens a database file in read-only mode. Acquires a shared OS-level
88/// advisory lock, so other read-only openers coexist but any writer is
89/// excluded. Attempts to mutate the returned `Database` (e.g. an
90/// `INSERT`, or a `save_database` call against it) bottom out in a
91/// `cannot commit: database is opened read-only` error from the Pager.
92pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
93    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
94}
95
96/// Opens a database file and reconstructs the in-memory `Database`,
97/// leaving the long-lived `Pager` attached for subsequent auto-save
98/// (read-write) or consistent-snapshot reads (read-only).
99pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
100    let pager = Pager::open_with_mode(path, mode)?;
101
102    // 1. Load sqlrite_master from the tree at header.schema_root_page.
103    let mut master = build_empty_master_table();
104    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
105
106    // 2. Two passes over master rows: first build every user table, then
107    //    attach secondary indexes. Indexes need their base table to exist
108    //    before we can populate them. Auto-indexes are created at table
109    //    build time so we only have to load explicit indexes from disk
110    //    (but we also reload the auto-index CONTENT because Table::new
111    //    built it empty).
112    let mut db = Database::new(db_name);
113    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
114
115    for rowid in master.rowids() {
116        let ty = take_text(&master, "type", rowid)?;
117        let name = take_text(&master, "name", rowid)?;
118        let sql = take_text(&master, "sql", rowid)?;
119        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
120        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
121
122        match ty.as_str() {
123            "table" => {
124                let (parsed_name, columns) = parse_create_sql(&sql)?;
125                if parsed_name != name {
126                    return Err(SQLRiteError::Internal(format!(
127                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
128                    )));
129                }
130                let mut table = build_empty_table(&name, columns, last_rowid);
131                if rootpage != 0 {
132                    load_table_rows(&pager, &mut table, rootpage)?;
133                }
134                if last_rowid > table.last_rowid {
135                    table.last_rowid = last_rowid;
136                }
137                db.tables.insert(name, table);
138            }
139            "index" => {
140                index_rows.push(IndexCatalogRow {
141                    name,
142                    sql,
143                    rootpage,
144                });
145            }
146            other => {
147                return Err(SQLRiteError::Internal(format!(
148                    "sqlrite_master row '{name}' has unknown type '{other}'"
149                )));
150            }
151        }
152    }
153
154    // Second pass: attach each index to its table. HNSW indexes
155    // (Phase 7d.2) take a different code path because their persisted
156    // form is just the CREATE INDEX SQL — the graph itself isn't
157    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
158    // and route to a graph-rebuild instead of the B-Tree-cell load.
159    for row in index_rows {
160        if create_index_sql_uses_hnsw(&row.sql) {
161            rebuild_hnsw_index(&mut db, &row)?;
162        } else {
163            attach_index(&mut db, &pager, row)?;
164        }
165    }
166
167    db.source_path = Some(path.to_path_buf());
168    db.pager = Some(pager);
169    Ok(db)
170}
171
172/// Catalog row for a secondary index — deferred until after every table is
173/// loaded so the index's base table exists by the time we populate it.
174struct IndexCatalogRow {
175    name: String,
176    sql: String,
177    rootpage: u32,
178}
179
180/// Persists `db` to disk. Same diff-commit behavior as before: only pages
181/// whose bytes actually changed get written.
182pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
183    let same_path = db.source_path.as_deref() == Some(path);
184    let mut pager = if same_path {
185        match db.pager.take() {
186            Some(p) => p,
187            None if path.exists() => Pager::open(path)?,
188            None => Pager::create(path)?,
189        }
190    } else if path.exists() {
191        Pager::open(path)?
192    } else {
193        Pager::create(path)?
194    };
195
196    pager.clear_staged();
197
198    // Page 0 is the header; payload pages start at 1.
199    let mut next_free_page: u32 = 1;
200
201    // 1. Stage each user table's B-Tree, collecting master-row info.
202    //    `kind` is "table" or "index" — master has one row per each.
203    let mut master_rows: Vec<CatalogEntry> = Vec::new();
204
205    let mut table_names: Vec<&String> = db.tables.keys().collect();
206    table_names.sort();
207    for name in table_names {
208        if name == MASTER_TABLE_NAME {
209            return Err(SQLRiteError::Internal(format!(
210                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
211            )));
212        }
213        let table = &db.tables[name];
214        let (rootpage, new_next) = stage_table_btree(&mut pager, table, next_free_page)?;
215        next_free_page = new_next;
216        master_rows.push(CatalogEntry {
217            kind: "table".into(),
218            name: name.clone(),
219            sql: table_to_create_sql(table),
220            rootpage,
221            last_rowid: table.last_rowid,
222        });
223    }
224
225    // 2. Stage each secondary index's B-Tree. Indexes persist in a
226    //    deterministic order: sorted by (owning_table, index_name).
227    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
228    for table in db.tables.values() {
229        for idx in &table.secondary_indexes {
230            index_entries.push((table, idx));
231        }
232    }
233    index_entries
234        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
235    for (_table, idx) in index_entries {
236        let (rootpage, new_next) = stage_index_btree(&mut pager, idx, next_free_page)?;
237        next_free_page = new_next;
238        master_rows.push(CatalogEntry {
239            kind: "index".into(),
240            name: idx.name.clone(),
241            sql: idx.synthesized_sql(),
242            rootpage,
243            last_rowid: 0,
244        });
245    }
246
247    // 2b. Phase 7d.2: persist HNSW indexes' CREATE INDEX SQL into
248    //     sqlrite_master so reopen reconstructs them. The graph itself
249    //     isn't persisted yet — `rootpage = 0` and the rebuild path
250    //     re-runs the SQL against current rows. Phase 7d.3 will write
251    //     the graph as cell-encoded pages and replace this with a real
252    //     rootpage + load-from-disk path.
253    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
254    for table in db.tables.values() {
255        for entry in &table.hnsw_indexes {
256            hnsw_entries.push((table, entry));
257        }
258    }
259    hnsw_entries
260        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
261    for (table, entry) in hnsw_entries {
262        master_rows.push(CatalogEntry {
263            kind: "index".into(),
264            name: entry.name.clone(),
265            sql: format!(
266                "CREATE INDEX {} ON {} USING hnsw ({})",
267                entry.name, table.tb_name, entry.column_name
268            ),
269            rootpage: 0, // no on-disk graph yet (Phase 7d.3)
270            last_rowid: 0,
271        });
272    }
273
274    // 3. Build an in-memory sqlrite_master with one row per table or index,
275    //    then stage it via the same tree-build path.
276    let mut master = build_empty_master_table();
277    for (i, entry) in master_rows.into_iter().enumerate() {
278        let rowid = (i as i64) + 1;
279        master.restore_row(
280            rowid,
281            vec![
282                Some(Value::Text(entry.kind)),
283                Some(Value::Text(entry.name)),
284                Some(Value::Text(entry.sql)),
285                Some(Value::Integer(entry.rootpage as i64)),
286                Some(Value::Integer(entry.last_rowid)),
287            ],
288        )?;
289    }
290    let (master_root, master_next) = stage_table_btree(&mut pager, &master, next_free_page)?;
291    next_free_page = master_next;
292
293    pager.commit(DbHeader {
294        page_count: next_free_page,
295        schema_root_page: master_root,
296    })?;
297
298    if same_path {
299        db.pager = Some(pager);
300    }
301    Ok(())
302}
303
304/// Build material for a single row in sqlrite_master.
305struct CatalogEntry {
306    kind: String, // "table" or "index"
307    name: String,
308    sql: String,
309    rootpage: u32,
310    last_rowid: i64,
311}
312
313// -------------------------------------------------------------------------
314// sqlrite_master — hardcoded catalog table schema
315
316fn build_empty_master_table() -> Table {
317    // Phase 3e: `type` is the first column, matching SQLite's convention.
318    // It distinguishes `'table'` rows from `'index'` rows.
319    let columns = vec![
320        Column::new("type".into(), "text".into(), false, true, false),
321        Column::new("name".into(), "text".into(), true, true, true),
322        Column::new("sql".into(), "text".into(), false, true, false),
323        Column::new("rootpage".into(), "integer".into(), false, true, false),
324        Column::new("last_rowid".into(), "integer".into(), false, true, false),
325    ];
326    build_empty_table(MASTER_TABLE_NAME, columns, 0)
327}
328
329/// Reads a required Text column from a known-good catalog row.
330fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
331    match table.get_value(col, rowid) {
332        Some(Value::Text(s)) => Ok(s),
333        other => Err(SQLRiteError::Internal(format!(
334            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
335        ))),
336    }
337}
338
339/// Reads a required Integer column from a known-good catalog row.
340fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
341    match table.get_value(col, rowid) {
342        Some(Value::Integer(v)) => Ok(v),
343        other => Err(SQLRiteError::Internal(format!(
344            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
345        ))),
346    }
347}
348
349// -------------------------------------------------------------------------
350// CREATE-TABLE SQL synthesis and re-parsing
351
352/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
353/// Deterministic: same schema → same SQL, so diffing commits stay stable.
354fn table_to_create_sql(table: &Table) -> String {
355    let mut parts = Vec::with_capacity(table.columns.len());
356    for c in &table.columns {
357        // Render the SQL type literally so the round-trip through
358        // CREATE TABLE re-parsing recreates the same schema. Vector
359        // carries its dimension inline.
360        let ty: String = match &c.datatype {
361            DataType::Integer => "INTEGER".to_string(),
362            DataType::Text => "TEXT".to_string(),
363            DataType::Real => "REAL".to_string(),
364            DataType::Bool => "BOOLEAN".to_string(),
365            DataType::Vector(dim) => format!("VECTOR({dim})"),
366            DataType::None | DataType::Invalid => "TEXT".to_string(),
367        };
368        let mut piece = format!("{} {}", c.column_name, ty);
369        if c.is_pk {
370            piece.push_str(" PRIMARY KEY");
371        } else {
372            if c.is_unique {
373                piece.push_str(" UNIQUE");
374            }
375            if c.not_null {
376                piece.push_str(" NOT NULL");
377            }
378        }
379        parts.push(piece);
380    }
381    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
382}
383
384/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
385/// and produces our internal column list. Returns `(table_name, columns)`.
386fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
387    let dialect = SQLiteDialect {};
388    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
389    let stmt = ast.pop().ok_or_else(|| {
390        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
391    })?;
392    let create = CreateQuery::new(&stmt)?;
393    let columns = create
394        .columns
395        .into_iter()
396        .map(|pc| Column::new(pc.name, pc.datatype, pc.is_pk, pc.not_null, pc.is_unique))
397        .collect();
398    Ok((create.table_name, columns))
399}
400
401// -------------------------------------------------------------------------
402// In-memory table (re)construction
403
404/// Builds an empty in-memory `Table` given the declared columns.
405fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
406    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
407    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
408    {
409        let mut map = rows.lock().expect("rows mutex poisoned");
410        for col in &columns {
411            // Mirror the dispatch in `Table::new` so the reconstructed
412            // table has the same shape it'd have if it were built fresh
413            // from SQL. Phase 7a adds the Vector arm — without it,
414            // VECTOR columns silently restore as Row::None and every
415            // restore_row hits a "storage None vs value Some(Vector(...))"
416            // type mismatch.
417            let row = match &col.datatype {
418                DataType::Integer => Row::Integer(BTreeMap::new()),
419                DataType::Text => Row::Text(BTreeMap::new()),
420                DataType::Real => Row::Real(BTreeMap::new()),
421                DataType::Bool => Row::Bool(BTreeMap::new()),
422                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
423                DataType::None | DataType::Invalid => Row::None,
424            };
425            map.insert(col.column_name.clone(), row);
426
427            // Auto-create UNIQUE/PK indexes so the restored table has the
428            // same shape Table::new would have built from fresh SQL.
429            if (col.is_pk || col.is_unique)
430                && matches!(col.datatype, DataType::Integer | DataType::Text)
431            {
432                if let Ok(idx) = SecondaryIndex::new(
433                    SecondaryIndex::auto_name(name, &col.column_name),
434                    name.to_string(),
435                    col.column_name.clone(),
436                    &col.datatype,
437                    true,
438                    IndexOrigin::Auto,
439                ) {
440                    secondary_indexes.push(idx);
441                }
442            }
443        }
444    }
445
446    let primary_key = columns
447        .iter()
448        .find(|c| c.is_pk)
449        .map(|c| c.column_name.clone())
450        .unwrap_or_else(|| "-1".to_string());
451
452    Table {
453        tb_name: name.to_string(),
454        columns,
455        rows,
456        secondary_indexes,
457        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
458        // executing each `CREATE INDEX … USING hnsw` SQL stored in
459        // `sqlrite_master`. This builder produces the empty shell;
460        // `replay_create_index_for_hnsw` (in this same module) walks
461        // sqlrite_master after every table is loaded and rebuilds the
462        // graph from current row data. Persistence of the graph itself
463        // (avoiding the on-open rebuild cost) is Phase 7d.3.
464        hnsw_indexes: Vec::new(),
465        last_rowid,
466        primary_key,
467    }
468}
469
470// -------------------------------------------------------------------------
471// Leaf-chain read / write
472
473/// Walks a table's B-Tree from `root_page`, following the leftmost-child
474/// chain down to the first leaf, then iterating leaves via their sibling
475/// `next_page` pointers. Every cell is decoded and replayed into `table`.
476///
477/// Open-path note: we eagerly materialize the entire table into `Table`'s
478/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
479/// on demand so queries can stream through the tree without a full upfront
480/// load.
481/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
482/// index on its base table by walking the tree of index cells at
483/// `rootpage`. The base table is expected to already be in `db.tables`.
484fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
485    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
486
487    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
488        SQLRiteError::Internal(format!(
489            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
490            row.name
491        ))
492    })?;
493    let datatype = table
494        .columns
495        .iter()
496        .find(|c| c.column_name == column_name)
497        .map(|c| clone_datatype(&c.datatype))
498        .ok_or_else(|| {
499            SQLRiteError::Internal(format!(
500                "index '{}' references unknown column '{column_name}' on '{table_name}'",
501                row.name
502            ))
503        })?;
504
505    // An auto-index on this column may already exist (built by
506    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
507    // the slot instead of adding a duplicate entry.
508    let existing_slot = table
509        .secondary_indexes
510        .iter()
511        .position(|i| i.name == row.name);
512    let idx = match existing_slot {
513        Some(i) => {
514            // Drain any entries that may have been populated during table
515            // restore_row calls — we're about to repopulate from the
516            // persisted tree.
517            table.secondary_indexes.remove(i)
518        }
519        None => SecondaryIndex::new(
520            row.name.clone(),
521            table_name.clone(),
522            column_name.clone(),
523            &datatype,
524            is_unique,
525            IndexOrigin::Explicit,
526        )?,
527    };
528    let mut idx = idx;
529    // Wipe any stale entries from the auto path so the load is idempotent.
530    let is_unique_flag = idx.is_unique;
531    let origin = idx.origin;
532    idx = SecondaryIndex::new(
533        idx.name,
534        idx.table_name,
535        idx.column_name,
536        &datatype,
537        is_unique_flag,
538        origin,
539    )?;
540
541    // Populate from the index tree's cells.
542    load_index_rows(pager, &mut idx, row.rootpage)?;
543
544    table.secondary_indexes.push(idx);
545    Ok(())
546}
547
548/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
549/// every `(value, rowid)` pair into `idx`.
550fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
551    if root_page == 0 {
552        return Ok(());
553    }
554    let first_leaf = find_leftmost_leaf(pager, root_page)?;
555    let mut current = first_leaf;
556    while current != 0 {
557        let page_buf = pager
558            .read_page(current)
559            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
560        if page_buf[0] != PageType::TableLeaf as u8 {
561            return Err(SQLRiteError::Internal(format!(
562                "page {current} tagged {} but expected TableLeaf (index)",
563                page_buf[0]
564            )));
565        }
566        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
567        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
568            .try_into()
569            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
570        let leaf = TablePage::from_bytes(payload);
571
572        for slot in 0..leaf.slot_count() {
573            // Slots on an index page hold KIND_INDEX cells; decode directly.
574            let offset = leaf.slot_offset_raw(slot)?;
575            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
576            idx.insert(&ic.value, ic.rowid)?;
577        }
578        current = next_leaf;
579    }
580    Ok(())
581}
582
583/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
584/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
585///
586/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
587/// still works; the only shape we accept is single-column indexes.
588fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
589    use sqlparser::ast::{CreateIndex, Expr, Statement};
590
591    let dialect = SQLiteDialect {};
592    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
593    let Some(Statement::CreateIndex(CreateIndex {
594        table_name,
595        columns,
596        unique,
597        ..
598    })) = ast.pop()
599    else {
600        return Err(SQLRiteError::Internal(format!(
601            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
602        )));
603    };
604    if columns.len() != 1 {
605        return Err(SQLRiteError::NotImplemented(
606            "multi-column indexes aren't supported yet".to_string(),
607        ));
608    }
609    let col = match &columns[0].column.expr {
610        Expr::Identifier(ident) => ident.value.clone(),
611        Expr::CompoundIdentifier(parts) => {
612            parts.last().map(|p| p.value.clone()).unwrap_or_default()
613        }
614        other => {
615            return Err(SQLRiteError::Internal(format!(
616                "unsupported indexed column expression: {other:?}"
617            )));
618        }
619    };
620    Ok((table_name.to_string(), col, unique))
621}
622
623/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
624/// Used by the open path to route HNSW indexes to the graph-rebuild path
625/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
626/// don't have a USING clause, so they all return false and continue
627/// taking the existing path.
628fn create_index_sql_uses_hnsw(sql: &str) -> bool {
629    use sqlparser::ast::{CreateIndex, IndexType, Statement};
630
631    let dialect = SQLiteDialect {};
632    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
633        return false;
634    };
635    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
636        return false;
637    };
638    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
639}
640
641/// Replays a CREATE INDEX … USING hnsw statement on a freshly-loaded
642/// database, rebuilding the in-memory HNSW graph from the table's
643/// current rows. Phase 7d.2 only — Phase 7d.3 will replace this with
644/// a load-from-disk path that reads the persisted graph.
645fn rebuild_hnsw_index(db: &mut Database, row: &IndexCatalogRow) -> Result<()> {
646    use crate::sql::executor::execute_create_index;
647    use sqlparser::ast::Statement;
648
649    let dialect = SQLiteDialect {};
650    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
651    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
652        return Err(SQLRiteError::Internal(format!(
653            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
654            row.sql
655        )));
656    };
657    // execute_create_index walks the table's existing rows and
658    // populates the new HnswIndex via the same code path used at
659    // user-issued CREATE INDEX time. The graph is identical to what
660    // the user originally got at index-creation; only the random
661    // RNG path through the geometric layer-assignment differs (we
662    // seed from the index name, so it's actually deterministic).
663    execute_create_index(&stmt, db)?;
664    Ok(())
665}
666
667/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
668fn clone_datatype(dt: &DataType) -> DataType {
669    match dt {
670        DataType::Integer => DataType::Integer,
671        DataType::Text => DataType::Text,
672        DataType::Real => DataType::Real,
673        DataType::Bool => DataType::Bool,
674        DataType::Vector(dim) => DataType::Vector(*dim),
675        DataType::None => DataType::None,
676        DataType::Invalid => DataType::Invalid,
677    }
678}
679
680/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
681/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
682/// `(root_page, next_free_page)`.
683///
684/// The tree's shape matches a regular table's — leaves chained via
685/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
686/// uniformly for index cells (same prefix as local cells), so the
687/// existing slot directory and binary search carry over.
688fn stage_index_btree(
689    pager: &mut Pager,
690    idx: &SecondaryIndex,
691    start_page: u32,
692) -> Result<(u32, u32)> {
693    // Build the leaves.
694    let (leaves, mut next_free_page) = stage_index_leaves(pager, idx, start_page)?;
695    if leaves.len() == 1 {
696        return Ok((leaves[0].0, next_free_page));
697    }
698    let mut level: Vec<(u32, i64)> = leaves;
699    while level.len() > 1 {
700        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
701        next_free_page = new_next_free;
702        level = next_level;
703    }
704    Ok((level[0].0, next_free_page))
705}
706
707/// Packs the index's (value, rowid) entries into a sibling-chained run
708/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
709/// (ascending value; rowids in insertion order within a value), which is
710/// also ascending by the "cell rowid" carried in each IndexCell (the
711/// original row's rowid) — so Cell::peek_rowid + the slot directory's
712/// rowid ordering stays consistent.
713fn stage_index_leaves(
714    pager: &mut Pager,
715    idx: &SecondaryIndex,
716    start_page: u32,
717) -> Result<(Vec<(u32, i64)>, u32)> {
718    let mut leaves: Vec<(u32, i64)> = Vec::new();
719    let mut current_leaf = TablePage::empty();
720    let mut current_leaf_page = start_page;
721    let mut current_max_rowid: Option<i64> = None;
722    let mut next_free_page = start_page + 1;
723
724    // Sort the entries by original rowid so the in-page slot directory,
725    // which binary-searches by rowid, stays valid. (iter_entries orders by
726    // value; we reorder here for B-Tree correctness.)
727    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
728    entries.sort_by_key(|(_, r)| *r);
729
730    for (value, rowid) in entries {
731        let cell = IndexCell::new(rowid, value);
732        let entry_bytes = cell.encode()?;
733
734        if !current_leaf.would_fit(entry_bytes.len()) {
735            let next_leaf_page_num = next_free_page;
736            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
737            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
738            current_leaf = TablePage::empty();
739            current_leaf_page = next_leaf_page_num;
740            next_free_page += 1;
741
742            if !current_leaf.would_fit(entry_bytes.len()) {
743                return Err(SQLRiteError::Internal(format!(
744                    "index entry of {} bytes exceeds empty-page capacity {}",
745                    entry_bytes.len(),
746                    current_leaf.free_space()
747                )));
748            }
749        }
750        current_leaf.insert_entry(rowid, &entry_bytes)?;
751        current_max_rowid = Some(rowid);
752    }
753
754    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
755    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
756    Ok((leaves, next_free_page))
757}
758
759fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
760    let first_leaf = find_leftmost_leaf(pager, root_page)?;
761    let mut current = first_leaf;
762    while current != 0 {
763        let page_buf = pager
764            .read_page(current)
765            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
766        if page_buf[0] != PageType::TableLeaf as u8 {
767            return Err(SQLRiteError::Internal(format!(
768                "page {current} tagged {} but expected TableLeaf",
769                page_buf[0]
770            )));
771        }
772        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
773        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
774            .try_into()
775            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
776        let leaf = TablePage::from_bytes(payload);
777
778        for slot in 0..leaf.slot_count() {
779            let entry = leaf.entry_at(slot)?;
780            let cell = match entry {
781                PagedEntry::Local(c) => c,
782                PagedEntry::Overflow(r) => {
783                    let body_bytes =
784                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
785                    let (c, _) = Cell::decode(&body_bytes, 0)?;
786                    c
787                }
788            };
789            table.restore_row(cell.rowid, cell.values)?;
790        }
791        current = next_leaf;
792    }
793    Ok(())
794}
795
796/// Descends from `root_page` through `InteriorNode` pages, always taking
797/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
798/// page number. A root that's already a leaf is returned as-is.
799fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
800    let mut current = root_page;
801    loop {
802        let page_buf = pager.read_page(current).ok_or_else(|| {
803            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
804        })?;
805        match page_buf[0] {
806            t if t == PageType::TableLeaf as u8 => return Ok(current),
807            t if t == PageType::InteriorNode as u8 => {
808                let payload: &[u8; PAYLOAD_PER_PAGE] =
809                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
810                        SQLRiteError::Internal("interior payload slice size".to_string())
811                    })?;
812                let interior = InteriorPage::from_bytes(payload);
813                current = interior.leftmost_child()?;
814            }
815            other => {
816                return Err(SQLRiteError::Internal(format!(
817                    "unexpected page type {other} during tree descent at page {current}"
818                )));
819            }
820        }
821    }
822}
823
824/// Stages a table's B-Tree starting at `start_page`. Returns
825/// `(root_page, next_free_page)`. Builds bottom-up:
826///
827/// 1. Pack all row cells into `TableLeaf` pages, chaining them via each
828///    leaf's `next_page` sibling pointer (for fast sequential scans).
829/// 2. If the table fits in a single leaf, that leaf is the root.
830/// 3. Otherwise, group leaves into `InteriorNode` pages; recurse up the
831///    tree until one root remains.
832///
833/// Deterministic: same in-memory rows → same pages at same offsets, so
834/// the Pager's diff commit still skips unchanged tables.
835fn stage_table_btree(pager: &mut Pager, table: &Table, start_page: u32) -> Result<(u32, u32)> {
836    let (leaves, mut next_free_page) = stage_leaves(pager, table, start_page)?;
837    if leaves.len() == 1 {
838        return Ok((leaves[0].0, next_free_page));
839    }
840    let mut level: Vec<(u32, i64)> = leaves;
841    while level.len() > 1 {
842        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
843        next_free_page = new_next_free;
844        level = next_level;
845    }
846    Ok((level[0].0, next_free_page))
847}
848
849/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
850/// Returns each leaf's `(page_number, max_rowid)` (used by the next level
851/// up to build divider cells) and the first free page after the chain
852/// including any overflow pages allocated for oversized cells.
853fn stage_leaves(
854    pager: &mut Pager,
855    table: &Table,
856    start_page: u32,
857) -> Result<(Vec<(u32, i64)>, u32)> {
858    let mut leaves: Vec<(u32, i64)> = Vec::new();
859    let mut current_leaf = TablePage::empty();
860    let mut current_leaf_page = start_page;
861    let mut current_max_rowid: Option<i64> = None;
862    let mut next_free_page = start_page + 1;
863
864    for rowid in table.rowids() {
865        let entry_bytes = build_row_entry(pager, table, rowid, &mut next_free_page)?;
866
867        if !current_leaf.would_fit(entry_bytes.len()) {
868            // Commit the current leaf. Its sibling next_page is the page
869            // number where the new leaf will go — which is next_free_page
870            // right now (no overflow pages have been allocated between
871            // this decision and the new leaf's allocation below).
872            let next_leaf_page_num = next_free_page;
873            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
874            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
875            current_leaf = TablePage::empty();
876            current_leaf_page = next_leaf_page_num;
877            next_free_page += 1;
878            // current_max_rowid is reassigned by the insert below; no need
879            // to zero it out here.
880
881            if !current_leaf.would_fit(entry_bytes.len()) {
882                return Err(SQLRiteError::Internal(format!(
883                    "entry of {} bytes exceeds empty-page capacity {}",
884                    entry_bytes.len(),
885                    current_leaf.free_space()
886                )));
887            }
888        }
889        current_leaf.insert_entry(rowid, &entry_bytes)?;
890        current_max_rowid = Some(rowid);
891    }
892
893    // Final leaf: sibling next_page = 0 (end of chain).
894    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
895    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
896    Ok((leaves, next_free_page))
897}
898
899/// Encodes a single row's on-leaf entry — either the local cell bytes, or
900/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
901/// encoded cell exceeded the inline threshold. Advances `next_free_page`
902/// past any overflow pages used.
903fn build_row_entry(
904    pager: &mut Pager,
905    table: &Table,
906    rowid: i64,
907    next_free_page: &mut u32,
908) -> Result<Vec<u8>> {
909    let values = table.extract_row(rowid);
910    let local_cell = Cell::new(rowid, values);
911    let local_bytes = local_cell.encode()?;
912    if local_bytes.len() > OVERFLOW_THRESHOLD {
913        let overflow_start = *next_free_page;
914        *next_free_page = write_overflow_chain(pager, &local_bytes, overflow_start)?;
915        Ok(OverflowRef {
916            rowid,
917            total_body_len: local_bytes.len() as u64,
918            first_overflow_page: overflow_start,
919        }
920        .encode())
921    } else {
922        Ok(local_bytes)
923    }
924}
925
926/// Builds one level of `InteriorNode` pages above the given children.
927/// Each interior packs as many dividers as will fit; the last child
928/// assigned to an interior becomes its `rightmost_child`. Returns the
929/// emitted interior pages as `(page_number, max_rowid_in_subtree)` so the
930/// next level can build on top of them.
931fn stage_interior_level(
932    pager: &mut Pager,
933    children: &[(u32, i64)],
934    start_page: u32,
935) -> Result<(Vec<(u32, i64)>, u32)> {
936    let mut next_level: Vec<(u32, i64)> = Vec::new();
937    let mut next_free_page = start_page;
938    let mut idx = 0usize;
939
940    while idx < children.len() {
941        let interior_page_num = next_free_page;
942        next_free_page += 1;
943
944        // Seed the interior with the first unassigned child as its
945        // rightmost. As we add more children, the previous rightmost
946        // graduates to being a divider and the new arrival takes over
947        // as rightmost.
948        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
949        idx += 1;
950        let mut interior = InteriorPage::empty(rightmost_child_page);
951
952        while idx < children.len() {
953            let new_divider_cell = InteriorCell {
954                divider_rowid: rightmost_child_max,
955                child_page: rightmost_child_page,
956            };
957            let new_divider_bytes = new_divider_cell.encode();
958            if !interior.would_fit(new_divider_bytes.len()) {
959                break;
960            }
961            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
962            let (next_child_page, next_child_max) = children[idx];
963            interior.set_rightmost_child(next_child_page);
964            rightmost_child_page = next_child_page;
965            rightmost_child_max = next_child_max;
966            idx += 1;
967        }
968
969        emit_interior(pager, interior_page_num, &interior);
970        next_level.push((interior_page_num, rightmost_child_max));
971    }
972
973    Ok((next_level, next_free_page))
974}
975
976/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
977fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
978    let mut buf = [0u8; PAGE_SIZE];
979    buf[0] = PageType::TableLeaf as u8;
980    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
981    // For leaf pages the legacy `payload_len` field isn't used — the slot
982    // directory self-describes. Zero it by convention.
983    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
984    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
985    pager.stage_page(page_num, buf);
986}
987
988/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
989/// don't use `next_page` (there's no sibling chain between interiors);
990/// `payload_len` is also unused (the slot directory self-describes).
991fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
992    let mut buf = [0u8; PAGE_SIZE];
993    buf[0] = PageType::InteriorNode as u8;
994    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
995    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
996    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
997    pager.stage_page(page_num, buf);
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::*;
1003    use crate::sql::process_command;
1004
1005    fn seed_db() -> Database {
1006        let mut db = Database::new("test".to_string());
1007        process_command(
1008            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1009            &mut db,
1010        )
1011        .unwrap();
1012        process_command(
1013            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1014            &mut db,
1015        )
1016        .unwrap();
1017        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1018        process_command(
1019            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1020            &mut db,
1021        )
1022        .unwrap();
1023        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1024        db
1025    }
1026
1027    fn tmp_path(name: &str) -> std::path::PathBuf {
1028        let mut p = std::env::temp_dir();
1029        let pid = std::process::id();
1030        let nanos = std::time::SystemTime::now()
1031            .duration_since(std::time::UNIX_EPOCH)
1032            .map(|d| d.as_nanos())
1033            .unwrap_or(0);
1034        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1035        p
1036    }
1037
1038    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1039    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1040    fn cleanup(path: &std::path::Path) {
1041        let _ = std::fs::remove_file(path);
1042        let mut wal = path.as_os_str().to_owned();
1043        wal.push("-wal");
1044        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1045    }
1046
1047    #[test]
1048    fn round_trip_preserves_schema_and_data() {
1049        let path = tmp_path("roundtrip");
1050        let mut db = seed_db();
1051        save_database(&mut db, &path).expect("save");
1052
1053        let loaded = open_database(&path, "test".to_string()).expect("open");
1054        assert_eq!(loaded.tables.len(), 2);
1055
1056        let users = loaded.get_table("users".to_string()).expect("users table");
1057        assert_eq!(users.columns.len(), 3);
1058        let rowids = users.rowids();
1059        assert_eq!(rowids.len(), 2);
1060        let names: Vec<String> = rowids
1061            .iter()
1062            .filter_map(|r| match users.get_value("name", *r) {
1063                Some(Value::Text(s)) => Some(s),
1064                _ => None,
1065            })
1066            .collect();
1067        assert!(names.contains(&"alice".to_string()));
1068        assert!(names.contains(&"bob".to_string()));
1069
1070        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1071        assert_eq!(notes.rowids().len(), 1);
1072
1073        cleanup(&path);
1074    }
1075
1076    // -----------------------------------------------------------------
1077    // Phase 7a — VECTOR(N) save / reopen round-trip
1078    // -----------------------------------------------------------------
1079
1080    #[test]
1081    fn round_trip_preserves_vector_column() {
1082        let path = tmp_path("vec_roundtrip");
1083
1084        // Build, populate, save.
1085        {
1086            let mut db = Database::new("test".to_string());
1087            process_command(
1088                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1089                &mut db,
1090            )
1091            .unwrap();
1092            process_command(
1093                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1094                &mut db,
1095            )
1096            .unwrap();
1097            process_command(
1098                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1099                &mut db,
1100            )
1101            .unwrap();
1102            save_database(&mut db, &path).expect("save");
1103        } // db drops → its exclusive lock releases before reopen.
1104
1105        // Reopen and verify schema + data both round-tripped.
1106        let loaded = open_database(&path, "test".to_string()).expect("open");
1107        let docs = loaded.get_table("docs".to_string()).expect("docs table");
1108
1109        // Schema preserved: column is still VECTOR(3).
1110        let embedding_col = docs
1111            .columns
1112            .iter()
1113            .find(|c| c.column_name == "embedding")
1114            .expect("embedding column");
1115        assert!(
1116            matches!(embedding_col.datatype, DataType::Vector(3)),
1117            "expected DataType::Vector(3) after round-trip, got {:?}",
1118            embedding_col.datatype
1119        );
1120
1121        // Data preserved: both vectors still readable bit-for-bit.
1122        let mut rows: Vec<Vec<f32>> = docs
1123            .rowids()
1124            .iter()
1125            .filter_map(|r| match docs.get_value("embedding", *r) {
1126                Some(Value::Vector(v)) => Some(v),
1127                _ => None,
1128            })
1129            .collect();
1130        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
1131        assert_eq!(rows.len(), 2);
1132        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1133        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1134
1135        cleanup(&path);
1136    }
1137
1138    #[test]
1139    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
1140        // Phase 7d.2: HNSW indexes don't yet persist their graph (that's
1141        // 7d.3), but the CREATE INDEX SQL goes into sqlrite_master and the
1142        // open path re-runs it to rebuild the graph from current rows.
1143        // After a save+reopen, the index entry should be back, with the
1144        // same column + the same population.
1145        let path = tmp_path("hnsw_roundtrip");
1146
1147        // Build, populate, index, save.
1148        {
1149            let mut db = Database::new("test".to_string());
1150            process_command(
1151                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
1152                &mut db,
1153            )
1154            .unwrap();
1155            for v in &[
1156                "[1.0, 0.0]",
1157                "[2.0, 0.0]",
1158                "[0.0, 3.0]",
1159                "[1.0, 4.0]",
1160                "[10.0, 10.0]",
1161            ] {
1162                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
1163            }
1164            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1165            save_database(&mut db, &path).expect("save");
1166        } // db drops → exclusive lock releases.
1167
1168        // Reopen and verify the index reattached, with the same name +
1169        // column + populated graph.
1170        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1171        {
1172            let table = loaded.get_table("docs".to_string()).expect("docs");
1173            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1174            let entry = &table.hnsw_indexes[0];
1175            assert_eq!(entry.name, "ix_e");
1176            assert_eq!(entry.column_name, "e");
1177            assert_eq!(entry.index.len(), 5, "rebuilt graph should hold all 5 rows");
1178        }
1179
1180        // Quick functional check: KNN query through the rebuilt index
1181        // returns results.
1182        let resp = process_command(
1183            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
1184            &mut loaded,
1185        )
1186        .unwrap();
1187        assert!(resp.contains("3 rows returned"), "got: {resp}");
1188
1189        cleanup(&path);
1190    }
1191
1192    #[test]
1193    fn round_trip_survives_writes_after_load() {
1194        let path = tmp_path("after_load");
1195        save_database(&mut seed_db(), &path).unwrap();
1196
1197        {
1198            let mut db = open_database(&path, "test".to_string()).unwrap();
1199            process_command(
1200                "INSERT INTO users (name, age) VALUES ('carol', 40);",
1201                &mut db,
1202            )
1203            .unwrap();
1204            save_database(&mut db, &path).unwrap();
1205        } // db drops → its exclusive lock releases before we reopen below.
1206
1207        let db2 = open_database(&path, "test".to_string()).unwrap();
1208        let users = db2.get_table("users".to_string()).unwrap();
1209        assert_eq!(users.rowids().len(), 3);
1210
1211        cleanup(&path);
1212    }
1213
1214    #[test]
1215    fn open_rejects_garbage_file() {
1216        let path = tmp_path("bad");
1217        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
1218        let result = open_database(&path, "x".to_string());
1219        assert!(result.is_err());
1220        cleanup(&path);
1221    }
1222
1223    #[test]
1224    fn many_small_rows_spread_across_leaves() {
1225        let path = tmp_path("many_rows");
1226        let mut db = Database::new("big".to_string());
1227        process_command(
1228            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1229            &mut db,
1230        )
1231        .unwrap();
1232        for i in 0..200 {
1233            let body = "x".repeat(200);
1234            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1235            process_command(&q, &mut db).unwrap();
1236        }
1237        save_database(&mut db, &path).unwrap();
1238        let loaded = open_database(&path, "big".to_string()).unwrap();
1239        let things = loaded.get_table("things".to_string()).unwrap();
1240        assert_eq!(things.rowids().len(), 200);
1241        cleanup(&path);
1242    }
1243
1244    #[test]
1245    fn huge_row_goes_through_overflow() {
1246        let path = tmp_path("overflow_row");
1247        let mut db = Database::new("big".to_string());
1248        process_command(
1249            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1250            &mut db,
1251        )
1252        .unwrap();
1253        let body = "A".repeat(10_000);
1254        process_command(
1255            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1256            &mut db,
1257        )
1258        .unwrap();
1259        save_database(&mut db, &path).unwrap();
1260
1261        let loaded = open_database(&path, "big".to_string()).unwrap();
1262        let docs = loaded.get_table("docs".to_string()).unwrap();
1263        let rowids = docs.rowids();
1264        assert_eq!(rowids.len(), 1);
1265        let stored = docs.get_value("body", rowids[0]);
1266        match stored {
1267            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
1268            other => panic!("expected Text, got {other:?}"),
1269        }
1270        cleanup(&path);
1271    }
1272
1273    #[test]
1274    fn create_sql_synthesis_round_trips() {
1275        // Build a table via CREATE, then verify table_to_create_sql +
1276        // parse_create_sql reproduce an equivalent column list.
1277        let mut db = Database::new("x".to_string());
1278        process_command(
1279            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
1280            &mut db,
1281        )
1282        .unwrap();
1283        let t = db.get_table("t".to_string()).unwrap();
1284        let sql = table_to_create_sql(t);
1285        let (name, cols) = parse_create_sql(&sql).unwrap();
1286        assert_eq!(name, "t");
1287        assert_eq!(cols.len(), 3);
1288        assert!(cols[0].is_pk);
1289        assert!(cols[1].is_unique);
1290        assert!(cols[2].not_null);
1291    }
1292
1293    #[test]
1294    fn sqlrite_master_is_not_exposed_as_a_user_table() {
1295        // After open, the public db.tables map should not list the master.
1296        let path = tmp_path("no_master");
1297        save_database(&mut seed_db(), &path).unwrap();
1298        let loaded = open_database(&path, "x".to_string()).unwrap();
1299        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
1300        cleanup(&path);
1301    }
1302
1303    #[test]
1304    fn multi_leaf_table_produces_an_interior_root() {
1305        // 200 fat rows force the table into multiple leaves, which means
1306        // save_database must build at least one InteriorNode above them.
1307        // The test verifies the round-trip works and confirms the root is
1308        // indeed an interior page (not a leaf) by reading the page type
1309        // directly out of the open pager.
1310        let path = tmp_path("multi_leaf_interior");
1311        let mut db = Database::new("big".to_string());
1312        process_command(
1313            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1314            &mut db,
1315        )
1316        .unwrap();
1317        for i in 0..200 {
1318            let body = "x".repeat(200);
1319            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1320            process_command(&q, &mut db).unwrap();
1321        }
1322        save_database(&mut db, &path).unwrap();
1323
1324        // Confirm the round-trip preserved all 200 rows.
1325        let loaded = open_database(&path, "big".to_string()).unwrap();
1326        let things = loaded.get_table("things".to_string()).unwrap();
1327        assert_eq!(things.rowids().len(), 200);
1328
1329        // Peek at `things`'s root page via the pager attached to the
1330        // loaded DB and check it's an InteriorNode, not a leaf.
1331        let pager = loaded
1332            .pager
1333            .as_ref()
1334            .expect("loaded DB should have a pager");
1335        // sqlrite_master's row for `things` holds its root page. Easiest
1336        // way to find it: walk the leaf chain by using find_leftmost_leaf
1337        // and then hop one level up. Simpler: read the master, scan for
1338        // the "things" row, look up rootpage.
1339        let mut master = build_empty_master_table();
1340        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1341        let things_root = master
1342            .rowids()
1343            .into_iter()
1344            .find_map(|r| match master.get_value("name", r) {
1345                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
1346                    Some(Value::Integer(p)) => Some(p as u32),
1347                    _ => None,
1348                },
1349                _ => None,
1350            })
1351            .expect("things should appear in sqlrite_master");
1352        let root_buf = pager.read_page(things_root).unwrap();
1353        assert_eq!(
1354            root_buf[0],
1355            PageType::InteriorNode as u8,
1356            "expected a multi-leaf table to have an interior root, got tag {}",
1357            root_buf[0]
1358        );
1359
1360        cleanup(&path);
1361    }
1362
1363    #[test]
1364    fn explicit_index_persists_across_save_and_open() {
1365        let path = tmp_path("idx_persist");
1366        let mut db = Database::new("idx".to_string());
1367        process_command(
1368            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
1369            &mut db,
1370        )
1371        .unwrap();
1372        for i in 1..=5 {
1373            let tag = if i % 2 == 0 { "odd" } else { "even" };
1374            process_command(
1375                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
1376                &mut db,
1377            )
1378            .unwrap();
1379        }
1380        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
1381        save_database(&mut db, &path).unwrap();
1382
1383        let loaded = open_database(&path, "idx".to_string()).unwrap();
1384        let users = loaded.get_table("users".to_string()).unwrap();
1385        let idx = users
1386            .index_by_name("users_tag_idx")
1387            .expect("explicit index should survive save/open");
1388        assert_eq!(idx.column_name, "tag");
1389        assert!(!idx.is_unique);
1390        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
1391        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
1392        let even_rowids = idx.lookup(&Value::Text("even".into()));
1393        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
1394        assert_eq!(even_rowids.len(), 3);
1395        assert_eq!(odd_rowids.len(), 2);
1396
1397        cleanup(&path);
1398    }
1399
1400    #[test]
1401    fn auto_indexes_for_unique_columns_survive_save_open() {
1402        let path = tmp_path("auto_idx_persist");
1403        let mut db = Database::new("a".to_string());
1404        process_command(
1405            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
1406            &mut db,
1407        )
1408        .unwrap();
1409        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
1410        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
1411        save_database(&mut db, &path).unwrap();
1412
1413        let loaded = open_database(&path, "a".to_string()).unwrap();
1414        let users = loaded.get_table("users".to_string()).unwrap();
1415        // Every UNIQUE column auto-creates an index; the load path populated
1416        // it from the persisted entries.
1417        let auto_name = SecondaryIndex::auto_name("users", "email");
1418        let idx = users
1419            .index_by_name(&auto_name)
1420            .expect("auto index should be restored");
1421        assert!(idx.is_unique);
1422        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
1423        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
1424
1425        cleanup(&path);
1426    }
1427
1428    #[test]
1429    fn deep_tree_round_trips() {
1430        // Force a 3-level tree by bypassing process_command (which prints
1431        // the full table on every INSERT, making large bulk loads O(N^2)
1432        // in I/O). We build the Table directly via restore_row.
1433        use crate::sql::db::table::Column as TableColumn;
1434
1435        let path = tmp_path("deep_tree");
1436        let mut db = Database::new("deep".to_string());
1437        let columns = vec![
1438            TableColumn::new("id".into(), "integer".into(), true, true, true),
1439            TableColumn::new("s".into(), "text".into(), false, true, false),
1440        ];
1441        let mut table = build_empty_table("t", columns, 0);
1442        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
1443        // which with interior fanout ~400 needs 2 interior levels (3-level
1444        // tree total, counting leaves).
1445        for i in 1..=6_000i64 {
1446            let body = "q".repeat(900);
1447            table
1448                .restore_row(
1449                    i,
1450                    vec![
1451                        Some(Value::Integer(i)),
1452                        Some(Value::Text(format!("r-{i}-{body}"))),
1453                    ],
1454                )
1455                .unwrap();
1456        }
1457        db.tables.insert("t".to_string(), table);
1458        save_database(&mut db, &path).unwrap();
1459
1460        let loaded = open_database(&path, "deep".to_string()).unwrap();
1461        let t = loaded.get_table("t".to_string()).unwrap();
1462        assert_eq!(t.rowids().len(), 6_000);
1463
1464        // Confirm the tree actually grew past 2 levels — i.e., the root's
1465        // leftmost child is itself an interior page, not a leaf.
1466        let pager = loaded.pager.as_ref().unwrap();
1467        let mut master = build_empty_master_table();
1468        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1469        let t_root = master
1470            .rowids()
1471            .into_iter()
1472            .find_map(|r| match master.get_value("name", r) {
1473                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
1474                    Some(Value::Integer(p)) => Some(p as u32),
1475                    _ => None,
1476                },
1477                _ => None,
1478            })
1479            .expect("t in sqlrite_master");
1480        let root_buf = pager.read_page(t_root).unwrap();
1481        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
1482        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
1483            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
1484        let root_interior = InteriorPage::from_bytes(root_payload);
1485        let child = root_interior.leftmost_child().unwrap();
1486        let child_buf = pager.read_page(child).unwrap();
1487        assert_eq!(
1488            child_buf[0],
1489            PageType::InteriorNode as u8,
1490            "expected 3-level tree: root's leftmost child should also be InteriorNode",
1491        );
1492
1493        cleanup(&path);
1494    }
1495}