Skip to main content

sqlrite/sql/pager/
mod.rs

1//! On-disk persistence for a `Database`, using fixed-size paged files.
2//!
3//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4//! (magic, version, page count, schema-root pointer). Every other page carries
5//! a small per-page header (type tag + next-page pointer + payload length)
6//! followed by a payload of up to 4089 bytes.
7//!
8//! **Storage strategy (format version 2, Phase 3c.5).**
9//!
10//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12//!   cells that exceed the inline threshold spill into an overflow chain
13//!   via `overflow.rs`.
14//! - The schema catalog is itself a regular table named `sqlrite_master`,
15//!   with one row per user table:
16//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19//!   itself is hardcoded into the engine so the open path can bootstrap.
20//! - Page 0's `schema_root_page` field points at the first leaf of
21//!   `sqlrite_master`.
22//!
23//! **Format version.** Version 2 is not compatible with files produced by
24//! earlier commits. Opening a v1 file returns a clean error — users on
25//! old files have to regenerate them from CREATE/INSERT, as there's no
26//! production data to migrate yet.
27
28// Data-layer modules. Not every helper in these modules is used by save/open
29// yet — some exist for tests, some for future maintenance operations.
30// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31// the modules with per-item attributes.
32#[allow(dead_code)]
33pub mod cell;
34pub mod file;
35pub mod header;
36#[allow(dead_code)]
37pub mod index_cell;
38#[allow(dead_code)]
39pub mod interior_page;
40pub mod overflow;
41pub mod page;
42pub mod pager;
43#[allow(dead_code)]
44pub mod table_page;
45#[allow(dead_code)]
46pub mod varint;
47#[allow(dead_code)]
48pub mod wal;
49
50use std::collections::{BTreeMap, HashMap};
51use std::path::Path;
52use std::sync::{Arc, Mutex};
53
54use sqlparser::dialect::SQLiteDialect;
55use sqlparser::parser::Parser;
56
57use crate::error::{Result, SQLRiteError};
58use crate::sql::db::database::Database;
59use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
60use crate::sql::db::table::{Column, DataType, Row, Table, Value};
61use crate::sql::pager::cell::Cell;
62use crate::sql::pager::header::DbHeader;
63use crate::sql::pager::index_cell::IndexCell;
64use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
65use crate::sql::pager::overflow::{
66    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
67};
68use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
69use crate::sql::pager::pager::Pager;
70use crate::sql::pager::table_page::TablePage;
71use crate::sql::parser::create::CreateQuery;
72
73// Re-export so callers can spell `sql::pager::AccessMode` without
74// reaching into the `pager::pager::pager` submodule path.
75pub use crate::sql::pager::pager::AccessMode;
76
77/// Name of the internal catalog table. Reserved — user CREATEs of this
78/// name must be rejected upstream.
79pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
80
81/// Opens a database file in read-write mode. Shorthand for
82/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
83pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
84    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
85}
86
87/// Opens a database file in read-only mode. Acquires a shared OS-level
88/// advisory lock, so other read-only openers coexist but any writer is
89/// excluded. Attempts to mutate the returned `Database` (e.g. an
90/// `INSERT`, or a `save_database` call against it) bottom out in a
91/// `cannot commit: database is opened read-only` error from the Pager.
92pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
93    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
94}
95
96/// Opens a database file and reconstructs the in-memory `Database`,
97/// leaving the long-lived `Pager` attached for subsequent auto-save
98/// (read-write) or consistent-snapshot reads (read-only).
99pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
100    let pager = Pager::open_with_mode(path, mode)?;
101
102    // 1. Load sqlrite_master from the tree at header.schema_root_page.
103    let mut master = build_empty_master_table();
104    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
105
106    // 2. Two passes over master rows: first build every user table, then
107    //    attach secondary indexes. Indexes need their base table to exist
108    //    before we can populate them. Auto-indexes are created at table
109    //    build time so we only have to load explicit indexes from disk
110    //    (but we also reload the auto-index CONTENT because Table::new
111    //    built it empty).
112    let mut db = Database::new(db_name);
113    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
114
115    for rowid in master.rowids() {
116        let ty = take_text(&master, "type", rowid)?;
117        let name = take_text(&master, "name", rowid)?;
118        let sql = take_text(&master, "sql", rowid)?;
119        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
120        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
121
122        match ty.as_str() {
123            "table" => {
124                let (parsed_name, columns) = parse_create_sql(&sql)?;
125                if parsed_name != name {
126                    return Err(SQLRiteError::Internal(format!(
127                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
128                    )));
129                }
130                let mut table = build_empty_table(&name, columns, last_rowid);
131                if rootpage != 0 {
132                    load_table_rows(&pager, &mut table, rootpage)?;
133                }
134                if last_rowid > table.last_rowid {
135                    table.last_rowid = last_rowid;
136                }
137                db.tables.insert(name, table);
138            }
139            "index" => {
140                index_rows.push(IndexCatalogRow {
141                    name,
142                    sql,
143                    rootpage,
144                });
145            }
146            other => {
147                return Err(SQLRiteError::Internal(format!(
148                    "sqlrite_master row '{name}' has unknown type '{other}'"
149                )));
150            }
151        }
152    }
153
154    // Second pass: attach each index to its table.
155    for row in index_rows {
156        attach_index(&mut db, &pager, row)?;
157    }
158
159    db.source_path = Some(path.to_path_buf());
160    db.pager = Some(pager);
161    Ok(db)
162}
163
164/// Catalog row for a secondary index — deferred until after every table is
165/// loaded so the index's base table exists by the time we populate it.
166struct IndexCatalogRow {
167    name: String,
168    sql: String,
169    rootpage: u32,
170}
171
172/// Persists `db` to disk. Same diff-commit behavior as before: only pages
173/// whose bytes actually changed get written.
174pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
175    let same_path = db.source_path.as_deref() == Some(path);
176    let mut pager = if same_path {
177        match db.pager.take() {
178            Some(p) => p,
179            None if path.exists() => Pager::open(path)?,
180            None => Pager::create(path)?,
181        }
182    } else if path.exists() {
183        Pager::open(path)?
184    } else {
185        Pager::create(path)?
186    };
187
188    pager.clear_staged();
189
190    // Page 0 is the header; payload pages start at 1.
191    let mut next_free_page: u32 = 1;
192
193    // 1. Stage each user table's B-Tree, collecting master-row info.
194    //    `kind` is "table" or "index" — master has one row per each.
195    let mut master_rows: Vec<CatalogEntry> = Vec::new();
196
197    let mut table_names: Vec<&String> = db.tables.keys().collect();
198    table_names.sort();
199    for name in table_names {
200        if name == MASTER_TABLE_NAME {
201            return Err(SQLRiteError::Internal(format!(
202                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
203            )));
204        }
205        let table = &db.tables[name];
206        let (rootpage, new_next) = stage_table_btree(&mut pager, table, next_free_page)?;
207        next_free_page = new_next;
208        master_rows.push(CatalogEntry {
209            kind: "table".into(),
210            name: name.clone(),
211            sql: table_to_create_sql(table),
212            rootpage,
213            last_rowid: table.last_rowid,
214        });
215    }
216
217    // 2. Stage each secondary index's B-Tree. Indexes persist in a
218    //    deterministic order: sorted by (owning_table, index_name).
219    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
220    for table in db.tables.values() {
221        for idx in &table.secondary_indexes {
222            index_entries.push((table, idx));
223        }
224    }
225    index_entries
226        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
227    for (_table, idx) in index_entries {
228        let (rootpage, new_next) = stage_index_btree(&mut pager, idx, next_free_page)?;
229        next_free_page = new_next;
230        master_rows.push(CatalogEntry {
231            kind: "index".into(),
232            name: idx.name.clone(),
233            sql: idx.synthesized_sql(),
234            rootpage,
235            last_rowid: 0,
236        });
237    }
238
239    // 3. Build an in-memory sqlrite_master with one row per table or index,
240    //    then stage it via the same tree-build path.
241    let mut master = build_empty_master_table();
242    for (i, entry) in master_rows.into_iter().enumerate() {
243        let rowid = (i as i64) + 1;
244        master.restore_row(
245            rowid,
246            vec![
247                Some(Value::Text(entry.kind)),
248                Some(Value::Text(entry.name)),
249                Some(Value::Text(entry.sql)),
250                Some(Value::Integer(entry.rootpage as i64)),
251                Some(Value::Integer(entry.last_rowid)),
252            ],
253        )?;
254    }
255    let (master_root, master_next) = stage_table_btree(&mut pager, &master, next_free_page)?;
256    next_free_page = master_next;
257
258    pager.commit(DbHeader {
259        page_count: next_free_page,
260        schema_root_page: master_root,
261    })?;
262
263    if same_path {
264        db.pager = Some(pager);
265    }
266    Ok(())
267}
268
269/// Build material for a single row in sqlrite_master.
270struct CatalogEntry {
271    kind: String, // "table" or "index"
272    name: String,
273    sql: String,
274    rootpage: u32,
275    last_rowid: i64,
276}
277
278// -------------------------------------------------------------------------
279// sqlrite_master — hardcoded catalog table schema
280
281fn build_empty_master_table() -> Table {
282    // Phase 3e: `type` is the first column, matching SQLite's convention.
283    // It distinguishes `'table'` rows from `'index'` rows.
284    let columns = vec![
285        Column::new("type".into(), "text".into(), false, true, false),
286        Column::new("name".into(), "text".into(), true, true, true),
287        Column::new("sql".into(), "text".into(), false, true, false),
288        Column::new("rootpage".into(), "integer".into(), false, true, false),
289        Column::new("last_rowid".into(), "integer".into(), false, true, false),
290    ];
291    build_empty_table(MASTER_TABLE_NAME, columns, 0)
292}
293
294/// Reads a required Text column from a known-good catalog row.
295fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
296    match table.get_value(col, rowid) {
297        Some(Value::Text(s)) => Ok(s),
298        other => Err(SQLRiteError::Internal(format!(
299            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
300        ))),
301    }
302}
303
304/// Reads a required Integer column from a known-good catalog row.
305fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
306    match table.get_value(col, rowid) {
307        Some(Value::Integer(v)) => Ok(v),
308        other => Err(SQLRiteError::Internal(format!(
309            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
310        ))),
311    }
312}
313
314// -------------------------------------------------------------------------
315// CREATE-TABLE SQL synthesis and re-parsing
316
317/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
318/// Deterministic: same schema → same SQL, so diffing commits stay stable.
319fn table_to_create_sql(table: &Table) -> String {
320    let mut parts = Vec::with_capacity(table.columns.len());
321    for c in &table.columns {
322        // Render the SQL type literally so the round-trip through
323        // CREATE TABLE re-parsing recreates the same schema. Vector
324        // carries its dimension inline.
325        let ty: String = match &c.datatype {
326            DataType::Integer => "INTEGER".to_string(),
327            DataType::Text => "TEXT".to_string(),
328            DataType::Real => "REAL".to_string(),
329            DataType::Bool => "BOOLEAN".to_string(),
330            DataType::Vector(dim) => format!("VECTOR({dim})"),
331            DataType::None | DataType::Invalid => "TEXT".to_string(),
332        };
333        let mut piece = format!("{} {}", c.column_name, ty);
334        if c.is_pk {
335            piece.push_str(" PRIMARY KEY");
336        } else {
337            if c.is_unique {
338                piece.push_str(" UNIQUE");
339            }
340            if c.not_null {
341                piece.push_str(" NOT NULL");
342            }
343        }
344        parts.push(piece);
345    }
346    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
347}
348
349/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
350/// and produces our internal column list. Returns `(table_name, columns)`.
351fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
352    let dialect = SQLiteDialect {};
353    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
354    let stmt = ast.pop().ok_or_else(|| {
355        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
356    })?;
357    let create = CreateQuery::new(&stmt)?;
358    let columns = create
359        .columns
360        .into_iter()
361        .map(|pc| Column::new(pc.name, pc.datatype, pc.is_pk, pc.not_null, pc.is_unique))
362        .collect();
363    Ok((create.table_name, columns))
364}
365
366// -------------------------------------------------------------------------
367// In-memory table (re)construction
368
369/// Builds an empty in-memory `Table` given the declared columns.
370fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
371    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
372    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
373    {
374        let mut map = rows.lock().expect("rows mutex poisoned");
375        for col in &columns {
376            // Mirror the dispatch in `Table::new` so the reconstructed
377            // table has the same shape it'd have if it were built fresh
378            // from SQL. Phase 7a adds the Vector arm — without it,
379            // VECTOR columns silently restore as Row::None and every
380            // restore_row hits a "storage None vs value Some(Vector(...))"
381            // type mismatch.
382            let row = match &col.datatype {
383                DataType::Integer => Row::Integer(BTreeMap::new()),
384                DataType::Text => Row::Text(BTreeMap::new()),
385                DataType::Real => Row::Real(BTreeMap::new()),
386                DataType::Bool => Row::Bool(BTreeMap::new()),
387                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
388                DataType::None | DataType::Invalid => Row::None,
389            };
390            map.insert(col.column_name.clone(), row);
391
392            // Auto-create UNIQUE/PK indexes so the restored table has the
393            // same shape Table::new would have built from fresh SQL.
394            if (col.is_pk || col.is_unique)
395                && matches!(col.datatype, DataType::Integer | DataType::Text)
396            {
397                if let Ok(idx) = SecondaryIndex::new(
398                    SecondaryIndex::auto_name(name, &col.column_name),
399                    name.to_string(),
400                    col.column_name.clone(),
401                    &col.datatype,
402                    true,
403                    IndexOrigin::Auto,
404                ) {
405                    secondary_indexes.push(idx);
406                }
407            }
408        }
409    }
410
411    let primary_key = columns
412        .iter()
413        .find(|c| c.is_pk)
414        .map(|c| c.column_name.clone())
415        .unwrap_or_else(|| "-1".to_string());
416
417    Table {
418        tb_name: name.to_string(),
419        columns,
420        rows,
421        secondary_indexes,
422        last_rowid,
423        primary_key,
424    }
425}
426
427// -------------------------------------------------------------------------
428// Leaf-chain read / write
429
430/// Walks a table's B-Tree from `root_page`, following the leftmost-child
431/// chain down to the first leaf, then iterating leaves via their sibling
432/// `next_page` pointers. Every cell is decoded and replayed into `table`.
433///
434/// Open-path note: we eagerly materialize the entire table into `Table`'s
435/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
436/// on demand so queries can stream through the tree without a full upfront
437/// load.
438/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
439/// index on its base table by walking the tree of index cells at
440/// `rootpage`. The base table is expected to already be in `db.tables`.
441fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
442    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
443
444    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
445        SQLRiteError::Internal(format!(
446            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
447            row.name
448        ))
449    })?;
450    let datatype = table
451        .columns
452        .iter()
453        .find(|c| c.column_name == column_name)
454        .map(|c| clone_datatype(&c.datatype))
455        .ok_or_else(|| {
456            SQLRiteError::Internal(format!(
457                "index '{}' references unknown column '{column_name}' on '{table_name}'",
458                row.name
459            ))
460        })?;
461
462    // An auto-index on this column may already exist (built by
463    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
464    // the slot instead of adding a duplicate entry.
465    let existing_slot = table
466        .secondary_indexes
467        .iter()
468        .position(|i| i.name == row.name);
469    let idx = match existing_slot {
470        Some(i) => {
471            // Drain any entries that may have been populated during table
472            // restore_row calls — we're about to repopulate from the
473            // persisted tree.
474            table.secondary_indexes.remove(i)
475        }
476        None => SecondaryIndex::new(
477            row.name.clone(),
478            table_name.clone(),
479            column_name.clone(),
480            &datatype,
481            is_unique,
482            IndexOrigin::Explicit,
483        )?,
484    };
485    let mut idx = idx;
486    // Wipe any stale entries from the auto path so the load is idempotent.
487    let is_unique_flag = idx.is_unique;
488    let origin = idx.origin;
489    idx = SecondaryIndex::new(
490        idx.name,
491        idx.table_name,
492        idx.column_name,
493        &datatype,
494        is_unique_flag,
495        origin,
496    )?;
497
498    // Populate from the index tree's cells.
499    load_index_rows(pager, &mut idx, row.rootpage)?;
500
501    table.secondary_indexes.push(idx);
502    Ok(())
503}
504
505/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
506/// every `(value, rowid)` pair into `idx`.
507fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
508    if root_page == 0 {
509        return Ok(());
510    }
511    let first_leaf = find_leftmost_leaf(pager, root_page)?;
512    let mut current = first_leaf;
513    while current != 0 {
514        let page_buf = pager
515            .read_page(current)
516            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
517        if page_buf[0] != PageType::TableLeaf as u8 {
518            return Err(SQLRiteError::Internal(format!(
519                "page {current} tagged {} but expected TableLeaf (index)",
520                page_buf[0]
521            )));
522        }
523        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
524        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
525            .try_into()
526            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
527        let leaf = TablePage::from_bytes(payload);
528
529        for slot in 0..leaf.slot_count() {
530            // Slots on an index page hold KIND_INDEX cells; decode directly.
531            let offset = leaf.slot_offset_raw(slot)?;
532            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
533            idx.insert(&ic.value, ic.rowid)?;
534        }
535        current = next_leaf;
536    }
537    Ok(())
538}
539
540/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
541/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
542///
543/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
544/// still works; the only shape we accept is single-column indexes.
545fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
546    use sqlparser::ast::{CreateIndex, Expr, Statement};
547
548    let dialect = SQLiteDialect {};
549    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
550    let Some(Statement::CreateIndex(CreateIndex {
551        table_name,
552        columns,
553        unique,
554        ..
555    })) = ast.pop()
556    else {
557        return Err(SQLRiteError::Internal(format!(
558            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
559        )));
560    };
561    if columns.len() != 1 {
562        return Err(SQLRiteError::NotImplemented(
563            "multi-column indexes aren't supported yet".to_string(),
564        ));
565    }
566    let col = match &columns[0].column.expr {
567        Expr::Identifier(ident) => ident.value.clone(),
568        Expr::CompoundIdentifier(parts) => {
569            parts.last().map(|p| p.value.clone()).unwrap_or_default()
570        }
571        other => {
572            return Err(SQLRiteError::Internal(format!(
573                "unsupported indexed column expression: {other:?}"
574            )));
575        }
576    };
577    Ok((table_name.to_string(), col, unique))
578}
579
580/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
581fn clone_datatype(dt: &DataType) -> DataType {
582    match dt {
583        DataType::Integer => DataType::Integer,
584        DataType::Text => DataType::Text,
585        DataType::Real => DataType::Real,
586        DataType::Bool => DataType::Bool,
587        DataType::Vector(dim) => DataType::Vector(*dim),
588        DataType::None => DataType::None,
589        DataType::Invalid => DataType::Invalid,
590    }
591}
592
593/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
594/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
595/// `(root_page, next_free_page)`.
596///
597/// The tree's shape matches a regular table's — leaves chained via
598/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
599/// uniformly for index cells (same prefix as local cells), so the
600/// existing slot directory and binary search carry over.
601fn stage_index_btree(
602    pager: &mut Pager,
603    idx: &SecondaryIndex,
604    start_page: u32,
605) -> Result<(u32, u32)> {
606    // Build the leaves.
607    let (leaves, mut next_free_page) = stage_index_leaves(pager, idx, start_page)?;
608    if leaves.len() == 1 {
609        return Ok((leaves[0].0, next_free_page));
610    }
611    let mut level: Vec<(u32, i64)> = leaves;
612    while level.len() > 1 {
613        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
614        next_free_page = new_next_free;
615        level = next_level;
616    }
617    Ok((level[0].0, next_free_page))
618}
619
620/// Packs the index's (value, rowid) entries into a sibling-chained run
621/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
622/// (ascending value; rowids in insertion order within a value), which is
623/// also ascending by the "cell rowid" carried in each IndexCell (the
624/// original row's rowid) — so Cell::peek_rowid + the slot directory's
625/// rowid ordering stays consistent.
626fn stage_index_leaves(
627    pager: &mut Pager,
628    idx: &SecondaryIndex,
629    start_page: u32,
630) -> Result<(Vec<(u32, i64)>, u32)> {
631    let mut leaves: Vec<(u32, i64)> = Vec::new();
632    let mut current_leaf = TablePage::empty();
633    let mut current_leaf_page = start_page;
634    let mut current_max_rowid: Option<i64> = None;
635    let mut next_free_page = start_page + 1;
636
637    // Sort the entries by original rowid so the in-page slot directory,
638    // which binary-searches by rowid, stays valid. (iter_entries orders by
639    // value; we reorder here for B-Tree correctness.)
640    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
641    entries.sort_by_key(|(_, r)| *r);
642
643    for (value, rowid) in entries {
644        let cell = IndexCell::new(rowid, value);
645        let entry_bytes = cell.encode()?;
646
647        if !current_leaf.would_fit(entry_bytes.len()) {
648            let next_leaf_page_num = next_free_page;
649            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
650            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
651            current_leaf = TablePage::empty();
652            current_leaf_page = next_leaf_page_num;
653            next_free_page += 1;
654
655            if !current_leaf.would_fit(entry_bytes.len()) {
656                return Err(SQLRiteError::Internal(format!(
657                    "index entry of {} bytes exceeds empty-page capacity {}",
658                    entry_bytes.len(),
659                    current_leaf.free_space()
660                )));
661            }
662        }
663        current_leaf.insert_entry(rowid, &entry_bytes)?;
664        current_max_rowid = Some(rowid);
665    }
666
667    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
668    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
669    Ok((leaves, next_free_page))
670}
671
672fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
673    let first_leaf = find_leftmost_leaf(pager, root_page)?;
674    let mut current = first_leaf;
675    while current != 0 {
676        let page_buf = pager
677            .read_page(current)
678            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
679        if page_buf[0] != PageType::TableLeaf as u8 {
680            return Err(SQLRiteError::Internal(format!(
681                "page {current} tagged {} but expected TableLeaf",
682                page_buf[0]
683            )));
684        }
685        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
686        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
687            .try_into()
688            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
689        let leaf = TablePage::from_bytes(payload);
690
691        for slot in 0..leaf.slot_count() {
692            let entry = leaf.entry_at(slot)?;
693            let cell = match entry {
694                PagedEntry::Local(c) => c,
695                PagedEntry::Overflow(r) => {
696                    let body_bytes =
697                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
698                    let (c, _) = Cell::decode(&body_bytes, 0)?;
699                    c
700                }
701            };
702            table.restore_row(cell.rowid, cell.values)?;
703        }
704        current = next_leaf;
705    }
706    Ok(())
707}
708
709/// Descends from `root_page` through `InteriorNode` pages, always taking
710/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
711/// page number. A root that's already a leaf is returned as-is.
712fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
713    let mut current = root_page;
714    loop {
715        let page_buf = pager.read_page(current).ok_or_else(|| {
716            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
717        })?;
718        match page_buf[0] {
719            t if t == PageType::TableLeaf as u8 => return Ok(current),
720            t if t == PageType::InteriorNode as u8 => {
721                let payload: &[u8; PAYLOAD_PER_PAGE] =
722                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
723                        SQLRiteError::Internal("interior payload slice size".to_string())
724                    })?;
725                let interior = InteriorPage::from_bytes(payload);
726                current = interior.leftmost_child()?;
727            }
728            other => {
729                return Err(SQLRiteError::Internal(format!(
730                    "unexpected page type {other} during tree descent at page {current}"
731                )));
732            }
733        }
734    }
735}
736
737/// Stages a table's B-Tree starting at `start_page`. Returns
738/// `(root_page, next_free_page)`. Builds bottom-up:
739///
740/// 1. Pack all row cells into `TableLeaf` pages, chaining them via each
741///    leaf's `next_page` sibling pointer (for fast sequential scans).
742/// 2. If the table fits in a single leaf, that leaf is the root.
743/// 3. Otherwise, group leaves into `InteriorNode` pages; recurse up the
744///    tree until one root remains.
745///
746/// Deterministic: same in-memory rows → same pages at same offsets, so
747/// the Pager's diff commit still skips unchanged tables.
748fn stage_table_btree(pager: &mut Pager, table: &Table, start_page: u32) -> Result<(u32, u32)> {
749    let (leaves, mut next_free_page) = stage_leaves(pager, table, start_page)?;
750    if leaves.len() == 1 {
751        return Ok((leaves[0].0, next_free_page));
752    }
753    let mut level: Vec<(u32, i64)> = leaves;
754    while level.len() > 1 {
755        let (next_level, new_next_free) = stage_interior_level(pager, &level, next_free_page)?;
756        next_free_page = new_next_free;
757        level = next_level;
758    }
759    Ok((level[0].0, next_free_page))
760}
761
762/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
763/// Returns each leaf's `(page_number, max_rowid)` (used by the next level
764/// up to build divider cells) and the first free page after the chain
765/// including any overflow pages allocated for oversized cells.
766fn stage_leaves(
767    pager: &mut Pager,
768    table: &Table,
769    start_page: u32,
770) -> Result<(Vec<(u32, i64)>, u32)> {
771    let mut leaves: Vec<(u32, i64)> = Vec::new();
772    let mut current_leaf = TablePage::empty();
773    let mut current_leaf_page = start_page;
774    let mut current_max_rowid: Option<i64> = None;
775    let mut next_free_page = start_page + 1;
776
777    for rowid in table.rowids() {
778        let entry_bytes = build_row_entry(pager, table, rowid, &mut next_free_page)?;
779
780        if !current_leaf.would_fit(entry_bytes.len()) {
781            // Commit the current leaf. Its sibling next_page is the page
782            // number where the new leaf will go — which is next_free_page
783            // right now (no overflow pages have been allocated between
784            // this decision and the new leaf's allocation below).
785            let next_leaf_page_num = next_free_page;
786            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
787            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
788            current_leaf = TablePage::empty();
789            current_leaf_page = next_leaf_page_num;
790            next_free_page += 1;
791            // current_max_rowid is reassigned by the insert below; no need
792            // to zero it out here.
793
794            if !current_leaf.would_fit(entry_bytes.len()) {
795                return Err(SQLRiteError::Internal(format!(
796                    "entry of {} bytes exceeds empty-page capacity {}",
797                    entry_bytes.len(),
798                    current_leaf.free_space()
799                )));
800            }
801        }
802        current_leaf.insert_entry(rowid, &entry_bytes)?;
803        current_max_rowid = Some(rowid);
804    }
805
806    // Final leaf: sibling next_page = 0 (end of chain).
807    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
808    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
809    Ok((leaves, next_free_page))
810}
811
812/// Encodes a single row's on-leaf entry — either the local cell bytes, or
813/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
814/// encoded cell exceeded the inline threshold. Advances `next_free_page`
815/// past any overflow pages used.
816fn build_row_entry(
817    pager: &mut Pager,
818    table: &Table,
819    rowid: i64,
820    next_free_page: &mut u32,
821) -> Result<Vec<u8>> {
822    let values = table.extract_row(rowid);
823    let local_cell = Cell::new(rowid, values);
824    let local_bytes = local_cell.encode()?;
825    if local_bytes.len() > OVERFLOW_THRESHOLD {
826        let overflow_start = *next_free_page;
827        *next_free_page = write_overflow_chain(pager, &local_bytes, overflow_start)?;
828        Ok(OverflowRef {
829            rowid,
830            total_body_len: local_bytes.len() as u64,
831            first_overflow_page: overflow_start,
832        }
833        .encode())
834    } else {
835        Ok(local_bytes)
836    }
837}
838
839/// Builds one level of `InteriorNode` pages above the given children.
840/// Each interior packs as many dividers as will fit; the last child
841/// assigned to an interior becomes its `rightmost_child`. Returns the
842/// emitted interior pages as `(page_number, max_rowid_in_subtree)` so the
843/// next level can build on top of them.
844fn stage_interior_level(
845    pager: &mut Pager,
846    children: &[(u32, i64)],
847    start_page: u32,
848) -> Result<(Vec<(u32, i64)>, u32)> {
849    let mut next_level: Vec<(u32, i64)> = Vec::new();
850    let mut next_free_page = start_page;
851    let mut idx = 0usize;
852
853    while idx < children.len() {
854        let interior_page_num = next_free_page;
855        next_free_page += 1;
856
857        // Seed the interior with the first unassigned child as its
858        // rightmost. As we add more children, the previous rightmost
859        // graduates to being a divider and the new arrival takes over
860        // as rightmost.
861        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
862        idx += 1;
863        let mut interior = InteriorPage::empty(rightmost_child_page);
864
865        while idx < children.len() {
866            let new_divider_cell = InteriorCell {
867                divider_rowid: rightmost_child_max,
868                child_page: rightmost_child_page,
869            };
870            let new_divider_bytes = new_divider_cell.encode();
871            if !interior.would_fit(new_divider_bytes.len()) {
872                break;
873            }
874            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
875            let (next_child_page, next_child_max) = children[idx];
876            interior.set_rightmost_child(next_child_page);
877            rightmost_child_page = next_child_page;
878            rightmost_child_max = next_child_max;
879            idx += 1;
880        }
881
882        emit_interior(pager, interior_page_num, &interior);
883        next_level.push((interior_page_num, rightmost_child_max));
884    }
885
886    Ok((next_level, next_free_page))
887}
888
889/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
890fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
891    let mut buf = [0u8; PAGE_SIZE];
892    buf[0] = PageType::TableLeaf as u8;
893    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
894    // For leaf pages the legacy `payload_len` field isn't used — the slot
895    // directory self-describes. Zero it by convention.
896    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
897    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
898    pager.stage_page(page_num, buf);
899}
900
901/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
902/// don't use `next_page` (there's no sibling chain between interiors);
903/// `payload_len` is also unused (the slot directory self-describes).
904fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
905    let mut buf = [0u8; PAGE_SIZE];
906    buf[0] = PageType::InteriorNode as u8;
907    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
908    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
909    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
910    pager.stage_page(page_num, buf);
911}
912
913#[cfg(test)]
914mod tests {
915    use super::*;
916    use crate::sql::process_command;
917
918    fn seed_db() -> Database {
919        let mut db = Database::new("test".to_string());
920        process_command(
921            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
922            &mut db,
923        )
924        .unwrap();
925        process_command(
926            "INSERT INTO users (name, age) VALUES ('alice', 30);",
927            &mut db,
928        )
929        .unwrap();
930        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
931        process_command(
932            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
933            &mut db,
934        )
935        .unwrap();
936        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
937        db
938    }
939
940    fn tmp_path(name: &str) -> std::path::PathBuf {
941        let mut p = std::env::temp_dir();
942        let pid = std::process::id();
943        let nanos = std::time::SystemTime::now()
944            .duration_since(std::time::UNIX_EPOCH)
945            .map(|d| d.as_nanos())
946            .unwrap_or(0);
947        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
948        p
949    }
950
951    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
952    /// `/tmp` doesn't accumulate orphan WALs across test runs.
953    fn cleanup(path: &std::path::Path) {
954        let _ = std::fs::remove_file(path);
955        let mut wal = path.as_os_str().to_owned();
956        wal.push("-wal");
957        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
958    }
959
960    #[test]
961    fn round_trip_preserves_schema_and_data() {
962        let path = tmp_path("roundtrip");
963        let mut db = seed_db();
964        save_database(&mut db, &path).expect("save");
965
966        let loaded = open_database(&path, "test".to_string()).expect("open");
967        assert_eq!(loaded.tables.len(), 2);
968
969        let users = loaded.get_table("users".to_string()).expect("users table");
970        assert_eq!(users.columns.len(), 3);
971        let rowids = users.rowids();
972        assert_eq!(rowids.len(), 2);
973        let names: Vec<String> = rowids
974            .iter()
975            .filter_map(|r| match users.get_value("name", *r) {
976                Some(Value::Text(s)) => Some(s),
977                _ => None,
978            })
979            .collect();
980        assert!(names.contains(&"alice".to_string()));
981        assert!(names.contains(&"bob".to_string()));
982
983        let notes = loaded.get_table("notes".to_string()).expect("notes table");
984        assert_eq!(notes.rowids().len(), 1);
985
986        cleanup(&path);
987    }
988
989    // -----------------------------------------------------------------
990    // Phase 7a — VECTOR(N) save / reopen round-trip
991    // -----------------------------------------------------------------
992
993    #[test]
994    fn round_trip_preserves_vector_column() {
995        let path = tmp_path("vec_roundtrip");
996
997        // Build, populate, save.
998        {
999            let mut db = Database::new("test".to_string());
1000            process_command(
1001                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
1002                &mut db,
1003            )
1004            .unwrap();
1005            process_command(
1006                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
1007                &mut db,
1008            )
1009            .unwrap();
1010            process_command(
1011                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
1012                &mut db,
1013            )
1014            .unwrap();
1015            save_database(&mut db, &path).expect("save");
1016        } // db drops → its exclusive lock releases before reopen.
1017
1018        // Reopen and verify schema + data both round-tripped.
1019        let loaded = open_database(&path, "test".to_string()).expect("open");
1020        let docs = loaded.get_table("docs".to_string()).expect("docs table");
1021
1022        // Schema preserved: column is still VECTOR(3).
1023        let embedding_col = docs
1024            .columns
1025            .iter()
1026            .find(|c| c.column_name == "embedding")
1027            .expect("embedding column");
1028        assert!(
1029            matches!(embedding_col.datatype, DataType::Vector(3)),
1030            "expected DataType::Vector(3) after round-trip, got {:?}",
1031            embedding_col.datatype
1032        );
1033
1034        // Data preserved: both vectors still readable bit-for-bit.
1035        let mut rows: Vec<Vec<f32>> = docs
1036            .rowids()
1037            .iter()
1038            .filter_map(|r| match docs.get_value("embedding", *r) {
1039                Some(Value::Vector(v)) => Some(v),
1040                _ => None,
1041            })
1042            .collect();
1043        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
1044        assert_eq!(rows.len(), 2);
1045        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1046        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1047
1048        cleanup(&path);
1049    }
1050
1051    #[test]
1052    fn round_trip_survives_writes_after_load() {
1053        let path = tmp_path("after_load");
1054        save_database(&mut seed_db(), &path).unwrap();
1055
1056        {
1057            let mut db = open_database(&path, "test".to_string()).unwrap();
1058            process_command(
1059                "INSERT INTO users (name, age) VALUES ('carol', 40);",
1060                &mut db,
1061            )
1062            .unwrap();
1063            save_database(&mut db, &path).unwrap();
1064        } // db drops → its exclusive lock releases before we reopen below.
1065
1066        let db2 = open_database(&path, "test".to_string()).unwrap();
1067        let users = db2.get_table("users".to_string()).unwrap();
1068        assert_eq!(users.rowids().len(), 3);
1069
1070        cleanup(&path);
1071    }
1072
1073    #[test]
1074    fn open_rejects_garbage_file() {
1075        let path = tmp_path("bad");
1076        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
1077        let result = open_database(&path, "x".to_string());
1078        assert!(result.is_err());
1079        cleanup(&path);
1080    }
1081
1082    #[test]
1083    fn many_small_rows_spread_across_leaves() {
1084        let path = tmp_path("many_rows");
1085        let mut db = Database::new("big".to_string());
1086        process_command(
1087            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1088            &mut db,
1089        )
1090        .unwrap();
1091        for i in 0..200 {
1092            let body = "x".repeat(200);
1093            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1094            process_command(&q, &mut db).unwrap();
1095        }
1096        save_database(&mut db, &path).unwrap();
1097        let loaded = open_database(&path, "big".to_string()).unwrap();
1098        let things = loaded.get_table("things".to_string()).unwrap();
1099        assert_eq!(things.rowids().len(), 200);
1100        cleanup(&path);
1101    }
1102
1103    #[test]
1104    fn huge_row_goes_through_overflow() {
1105        let path = tmp_path("overflow_row");
1106        let mut db = Database::new("big".to_string());
1107        process_command(
1108            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
1109            &mut db,
1110        )
1111        .unwrap();
1112        let body = "A".repeat(10_000);
1113        process_command(
1114            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
1115            &mut db,
1116        )
1117        .unwrap();
1118        save_database(&mut db, &path).unwrap();
1119
1120        let loaded = open_database(&path, "big".to_string()).unwrap();
1121        let docs = loaded.get_table("docs".to_string()).unwrap();
1122        let rowids = docs.rowids();
1123        assert_eq!(rowids.len(), 1);
1124        let stored = docs.get_value("body", rowids[0]);
1125        match stored {
1126            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
1127            other => panic!("expected Text, got {other:?}"),
1128        }
1129        cleanup(&path);
1130    }
1131
1132    #[test]
1133    fn create_sql_synthesis_round_trips() {
1134        // Build a table via CREATE, then verify table_to_create_sql +
1135        // parse_create_sql reproduce an equivalent column list.
1136        let mut db = Database::new("x".to_string());
1137        process_command(
1138            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
1139            &mut db,
1140        )
1141        .unwrap();
1142        let t = db.get_table("t".to_string()).unwrap();
1143        let sql = table_to_create_sql(t);
1144        let (name, cols) = parse_create_sql(&sql).unwrap();
1145        assert_eq!(name, "t");
1146        assert_eq!(cols.len(), 3);
1147        assert!(cols[0].is_pk);
1148        assert!(cols[1].is_unique);
1149        assert!(cols[2].not_null);
1150    }
1151
1152    #[test]
1153    fn sqlrite_master_is_not_exposed_as_a_user_table() {
1154        // After open, the public db.tables map should not list the master.
1155        let path = tmp_path("no_master");
1156        save_database(&mut seed_db(), &path).unwrap();
1157        let loaded = open_database(&path, "x".to_string()).unwrap();
1158        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
1159        cleanup(&path);
1160    }
1161
1162    #[test]
1163    fn multi_leaf_table_produces_an_interior_root() {
1164        // 200 fat rows force the table into multiple leaves, which means
1165        // save_database must build at least one InteriorNode above them.
1166        // The test verifies the round-trip works and confirms the root is
1167        // indeed an interior page (not a leaf) by reading the page type
1168        // directly out of the open pager.
1169        let path = tmp_path("multi_leaf_interior");
1170        let mut db = Database::new("big".to_string());
1171        process_command(
1172            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
1173            &mut db,
1174        )
1175        .unwrap();
1176        for i in 0..200 {
1177            let body = "x".repeat(200);
1178            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
1179            process_command(&q, &mut db).unwrap();
1180        }
1181        save_database(&mut db, &path).unwrap();
1182
1183        // Confirm the round-trip preserved all 200 rows.
1184        let loaded = open_database(&path, "big".to_string()).unwrap();
1185        let things = loaded.get_table("things".to_string()).unwrap();
1186        assert_eq!(things.rowids().len(), 200);
1187
1188        // Peek at `things`'s root page via the pager attached to the
1189        // loaded DB and check it's an InteriorNode, not a leaf.
1190        let pager = loaded
1191            .pager
1192            .as_ref()
1193            .expect("loaded DB should have a pager");
1194        // sqlrite_master's row for `things` holds its root page. Easiest
1195        // way to find it: walk the leaf chain by using find_leftmost_leaf
1196        // and then hop one level up. Simpler: read the master, scan for
1197        // the "things" row, look up rootpage.
1198        let mut master = build_empty_master_table();
1199        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1200        let things_root = master
1201            .rowids()
1202            .into_iter()
1203            .find_map(|r| match master.get_value("name", r) {
1204                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
1205                    Some(Value::Integer(p)) => Some(p as u32),
1206                    _ => None,
1207                },
1208                _ => None,
1209            })
1210            .expect("things should appear in sqlrite_master");
1211        let root_buf = pager.read_page(things_root).unwrap();
1212        assert_eq!(
1213            root_buf[0],
1214            PageType::InteriorNode as u8,
1215            "expected a multi-leaf table to have an interior root, got tag {}",
1216            root_buf[0]
1217        );
1218
1219        cleanup(&path);
1220    }
1221
1222    #[test]
1223    fn explicit_index_persists_across_save_and_open() {
1224        let path = tmp_path("idx_persist");
1225        let mut db = Database::new("idx".to_string());
1226        process_command(
1227            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
1228            &mut db,
1229        )
1230        .unwrap();
1231        for i in 1..=5 {
1232            let tag = if i % 2 == 0 { "odd" } else { "even" };
1233            process_command(
1234                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
1235                &mut db,
1236            )
1237            .unwrap();
1238        }
1239        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
1240        save_database(&mut db, &path).unwrap();
1241
1242        let loaded = open_database(&path, "idx".to_string()).unwrap();
1243        let users = loaded.get_table("users".to_string()).unwrap();
1244        let idx = users
1245            .index_by_name("users_tag_idx")
1246            .expect("explicit index should survive save/open");
1247        assert_eq!(idx.column_name, "tag");
1248        assert!(!idx.is_unique);
1249        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
1250        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
1251        let even_rowids = idx.lookup(&Value::Text("even".into()));
1252        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
1253        assert_eq!(even_rowids.len(), 3);
1254        assert_eq!(odd_rowids.len(), 2);
1255
1256        cleanup(&path);
1257    }
1258
1259    #[test]
1260    fn auto_indexes_for_unique_columns_survive_save_open() {
1261        let path = tmp_path("auto_idx_persist");
1262        let mut db = Database::new("a".to_string());
1263        process_command(
1264            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
1265            &mut db,
1266        )
1267        .unwrap();
1268        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
1269        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
1270        save_database(&mut db, &path).unwrap();
1271
1272        let loaded = open_database(&path, "a".to_string()).unwrap();
1273        let users = loaded.get_table("users".to_string()).unwrap();
1274        // Every UNIQUE column auto-creates an index; the load path populated
1275        // it from the persisted entries.
1276        let auto_name = SecondaryIndex::auto_name("users", "email");
1277        let idx = users
1278            .index_by_name(&auto_name)
1279            .expect("auto index should be restored");
1280        assert!(idx.is_unique);
1281        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
1282        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
1283
1284        cleanup(&path);
1285    }
1286
1287    #[test]
1288    fn deep_tree_round_trips() {
1289        // Force a 3-level tree by bypassing process_command (which prints
1290        // the full table on every INSERT, making large bulk loads O(N^2)
1291        // in I/O). We build the Table directly via restore_row.
1292        use crate::sql::db::table::Column as TableColumn;
1293
1294        let path = tmp_path("deep_tree");
1295        let mut db = Database::new("deep".to_string());
1296        let columns = vec![
1297            TableColumn::new("id".into(), "integer".into(), true, true, true),
1298            TableColumn::new("s".into(), "text".into(), false, true, false),
1299        ];
1300        let mut table = build_empty_table("t", columns, 0);
1301        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
1302        // which with interior fanout ~400 needs 2 interior levels (3-level
1303        // tree total, counting leaves).
1304        for i in 1..=6_000i64 {
1305            let body = "q".repeat(900);
1306            table
1307                .restore_row(
1308                    i,
1309                    vec![
1310                        Some(Value::Integer(i)),
1311                        Some(Value::Text(format!("r-{i}-{body}"))),
1312                    ],
1313                )
1314                .unwrap();
1315        }
1316        db.tables.insert("t".to_string(), table);
1317        save_database(&mut db, &path).unwrap();
1318
1319        let loaded = open_database(&path, "deep".to_string()).unwrap();
1320        let t = loaded.get_table("t".to_string()).unwrap();
1321        assert_eq!(t.rowids().len(), 6_000);
1322
1323        // Confirm the tree actually grew past 2 levels — i.e., the root's
1324        // leftmost child is itself an interior page, not a leaf.
1325        let pager = loaded.pager.as_ref().unwrap();
1326        let mut master = build_empty_master_table();
1327        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
1328        let t_root = master
1329            .rowids()
1330            .into_iter()
1331            .find_map(|r| match master.get_value("name", r) {
1332                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
1333                    Some(Value::Integer(p)) => Some(p as u32),
1334                    _ => None,
1335                },
1336                _ => None,
1337            })
1338            .expect("t in sqlrite_master");
1339        let root_buf = pager.read_page(t_root).unwrap();
1340        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
1341        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
1342            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
1343        let root_interior = InteriorPage::from_bytes(root_payload);
1344        let child = root_interior.leftmost_child().unwrap();
1345        let child_buf = pager.read_page(child).unwrap();
1346        assert_eq!(
1347            child_buf[0],
1348            PageType::InteriorNode as u8,
1349            "expected 3-level tree: root's leftmost child should also be InteriorNode",
1350        );
1351
1352        cleanup(&path);
1353    }
1354}