#[allow(dead_code)]
pub mod allocator;
#[allow(dead_code)]
pub mod cell;
pub mod file;
#[allow(dead_code)]
pub mod freelist;
#[allow(dead_code)]
pub mod fts_cell;
pub mod header;
#[allow(dead_code)]
pub mod hnsw_cell;
#[allow(dead_code)]
pub mod index_cell;
#[allow(dead_code)]
pub mod interior_page;
pub mod overflow;
pub mod page;
pub mod pager;
#[allow(dead_code)]
pub mod table_page;
#[allow(dead_code)]
pub mod varint;
#[allow(dead_code)]
pub mod wal;
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::sql::dialect::SqlriteDialect;
use sqlparser::parser::Parser;
use crate::error::{Result, SQLRiteError};
use crate::sql::db::database::Database;
use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
use crate::sql::db::table::{Column, DataType, Row, Table, Value};
use crate::sql::hnsw::DistanceMetric;
use crate::sql::pager::cell::Cell;
use crate::sql::pager::header::DbHeader;
use crate::sql::pager::index_cell::IndexCell;
use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
use crate::sql::pager::overflow::{
OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
};
use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
use crate::sql::pager::pager::Pager;
use crate::sql::pager::table_page::TablePage;
use crate::sql::parser::create::CreateQuery;
pub use crate::sql::pager::pager::AccessMode;
pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
open_database_with_mode(path, db_name, AccessMode::ReadWrite)
}
pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
open_database_with_mode(path, db_name, AccessMode::ReadOnly)
}
pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
let pager = Pager::open_with_mode(path, mode)?;
let mut master = build_empty_master_table();
load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
let mut db = Database::new(db_name);
let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
for rowid in master.rowids() {
let ty = take_text(&master, "type", rowid)?;
let name = take_text(&master, "name", rowid)?;
let sql = take_text(&master, "sql", rowid)?;
let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
let last_rowid = take_integer(&master, "last_rowid", rowid)?;
match ty.as_str() {
"table" => {
let (parsed_name, columns) = parse_create_sql(&sql)?;
if parsed_name != name {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
)));
}
let mut table = build_empty_table(&name, columns, last_rowid);
if rootpage != 0 {
load_table_rows(&pager, &mut table, rootpage)?;
}
if last_rowid > table.last_rowid {
table.last_rowid = last_rowid;
}
db.tables.insert(name, table);
}
"index" => {
index_rows.push(IndexCatalogRow {
name,
sql,
rootpage,
});
}
other => {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master row '{name}' has unknown type '{other}'"
)));
}
}
}
for row in index_rows {
if create_index_sql_uses_hnsw(&row.sql) {
rebuild_hnsw_index(&mut db, &pager, &row)?;
} else if create_index_sql_uses_fts(&row.sql) {
rebuild_fts_index(&mut db, &pager, &row)?;
} else {
attach_index(&mut db, &pager, row)?;
}
}
replay_mvcc_into_db(&mut db, &pager)?;
db.source_path = Some(path.to_path_buf());
db.pager = Some(pager);
Ok(db)
}
fn replay_mvcc_into_db(db: &mut Database, pager: &Pager) -> Result<()> {
use crate::mvcc::RowVersion;
let mut clock_seed = pager.clock_high_water();
for batch in pager.recovered_mvcc_commits() {
if batch.commit_ts > clock_seed {
clock_seed = batch.commit_ts;
}
for rec in &batch.records {
let version = RowVersion::committed(batch.commit_ts, rec.payload.clone());
db.mv_store
.push_committed(rec.row.clone(), version)
.map_err(|e| {
SQLRiteError::Internal(format!(
"WAL MVCC replay: push_committed failed for {}/{}: {e}",
rec.row.table, rec.row.rowid,
))
})?;
}
}
if clock_seed > 0 {
db.mvcc_clock.observe(clock_seed);
}
Ok(())
}
struct IndexCatalogRow {
name: String,
sql: String,
rootpage: u32,
}
pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
save_database_with_mode(db, path, false)
}
pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
save_database_with_mode(db, path, true)
}
fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
rebuild_dirty_hnsw_indexes(db);
rebuild_dirty_fts_indexes(db);
let same_path = db.source_path.as_deref() == Some(path);
let mut pager = if same_path {
match db.pager.take() {
Some(p) => p,
None if path.exists() => Pager::open(path)?,
None => Pager::create(path)?,
}
} else if path.exists() {
Pager::open(path)?
} else {
Pager::create(path)?
};
let old_header = pager.header();
let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
(Vec::new(), Vec::new())
} else {
crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
};
let old_rootpages = if compact {
HashMap::new()
} else {
read_old_rootpages(&pager, old_header.schema_root_page)?
};
let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
HashMap::new()
} else {
let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
for ((kind, name), &root) in &old_rootpages {
let follow = kind == "table";
let pages = collect_pages_for_btree(&pager, root, follow)?;
map.insert((kind.clone(), name.clone()), pages);
}
map
};
let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
Vec::new()
} else {
collect_pages_for_btree(
&pager,
old_header.schema_root_page,
true,
)?
};
pager.clear_staged();
use std::collections::VecDeque;
let initial_freelist: VecDeque<u32> = if compact {
VecDeque::new()
} else {
crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
};
let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
let mut master_rows: Vec<CatalogEntry> = Vec::new();
let mut table_names: Vec<&String> = db.tables.keys().collect();
table_names.sort();
for name in table_names {
if name == MASTER_TABLE_NAME {
return Err(SQLRiteError::Internal(format!(
"user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
)));
}
if !compact {
if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
alloc.set_preferred(prev.clone());
}
}
let table = &db.tables[name];
let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
alloc.finish_preferred();
master_rows.push(CatalogEntry {
kind: "table".into(),
name: name.clone(),
sql: table_to_create_sql(table),
rootpage,
last_rowid: table.last_rowid,
});
}
let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
for table in db.tables.values() {
for idx in &table.secondary_indexes {
index_entries.push((table, idx));
}
}
index_entries
.sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
for (_table, idx) in index_entries {
if !compact {
if let Some(prev) =
old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
{
alloc.set_preferred(prev.clone());
}
}
let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
alloc.finish_preferred();
master_rows.push(CatalogEntry {
kind: "index".into(),
name: idx.name.clone(),
sql: idx.synthesized_sql(),
rootpage,
last_rowid: 0,
});
}
let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
for table in db.tables.values() {
for entry in &table.hnsw_indexes {
hnsw_entries.push((table, entry));
}
}
hnsw_entries
.sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
for (table, entry) in hnsw_entries {
if !compact {
if let Some(prev) =
old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
{
alloc.set_preferred(prev.clone());
}
}
let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
alloc.finish_preferred();
master_rows.push(CatalogEntry {
kind: "index".into(),
name: entry.name.clone(),
sql: synthesize_hnsw_create_index_sql(
&entry.name,
&table.tb_name,
&entry.column_name,
entry.metric,
),
rootpage,
last_rowid: 0,
});
}
let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
for table in db.tables.values() {
for entry in &table.fts_indexes {
fts_entries.push((table, entry));
}
}
fts_entries
.sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
let any_fts = !fts_entries.is_empty();
for (table, entry) in fts_entries {
if !compact {
if let Some(prev) =
old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
{
alloc.set_preferred(prev.clone());
}
}
let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
alloc.finish_preferred();
master_rows.push(CatalogEntry {
kind: "index".into(),
name: entry.name.clone(),
sql: format!(
"CREATE INDEX {} ON {} USING fts ({})",
entry.name, table.tb_name, entry.column_name
),
rootpage,
last_rowid: 0,
});
}
let mut master = build_empty_master_table();
for (i, entry) in master_rows.into_iter().enumerate() {
let rowid = (i as i64) + 1;
master.restore_row(
rowid,
vec![
Some(Value::Text(entry.kind)),
Some(Value::Text(entry.name)),
Some(Value::Text(entry.sql)),
Some(Value::Integer(entry.rootpage as i64)),
Some(Value::Integer(entry.last_rowid)),
],
)?;
}
if !compact && !old_master_pages.is_empty() {
alloc.set_preferred(old_master_pages.clone());
}
let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
alloc.finish_preferred();
if !compact {
let used = alloc.used().clone();
let mut newly_freed: Vec<u32> = old_live
.iter()
.copied()
.filter(|p| !used.contains(p))
.collect();
let _ = &old_free_trunks; alloc.add_to_freelist(newly_freed.drain(..));
}
let new_free_pages = alloc.drain_freelist();
let new_freelist_head =
crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
let format_version = if new_freelist_head != 0 {
FORMAT_VERSION_V6
} else if any_fts {
std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
} else {
old_header.format_version
};
pager.commit(DbHeader {
page_count: alloc.high_water(),
schema_root_page: master_root,
format_version,
freelist_head: new_freelist_head,
})?;
if same_path {
db.pager = Some(pager);
}
Ok(())
}
struct CatalogEntry {
kind: String, name: String,
sql: String,
rootpage: u32,
last_rowid: i64,
}
fn build_empty_master_table() -> Table {
let columns = vec![
Column::new("type".into(), "text".into(), false, true, false),
Column::new("name".into(), "text".into(), true, true, true),
Column::new("sql".into(), "text".into(), false, true, false),
Column::new("rootpage".into(), "integer".into(), false, true, false),
Column::new("last_rowid".into(), "integer".into(), false, true, false),
];
build_empty_table(MASTER_TABLE_NAME, columns, 0)
}
fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
match table.get_value(col, rowid) {
Some(Value::Text(s)) => Ok(s),
other => Err(SQLRiteError::Internal(format!(
"sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
))),
}
}
fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
match table.get_value(col, rowid) {
Some(Value::Integer(v)) => Ok(v),
other => Err(SQLRiteError::Internal(format!(
"sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
))),
}
}
fn table_to_create_sql(table: &Table) -> String {
let mut parts = Vec::with_capacity(table.columns.len());
for c in &table.columns {
let ty: String = match &c.datatype {
DataType::Integer => "INTEGER".to_string(),
DataType::Text => "TEXT".to_string(),
DataType::Real => "REAL".to_string(),
DataType::Bool => "BOOLEAN".to_string(),
DataType::Vector(dim) => format!("VECTOR({dim})"),
DataType::Json => "JSON".to_string(),
DataType::None | DataType::Invalid => "TEXT".to_string(),
};
let mut piece = format!("{} {}", c.column_name, ty);
if c.is_pk {
piece.push_str(" PRIMARY KEY");
} else {
if c.is_unique {
piece.push_str(" UNIQUE");
}
if c.not_null {
piece.push_str(" NOT NULL");
}
}
if let Some(default) = &c.default {
piece.push_str(" DEFAULT ");
piece.push_str(&render_default_literal(default));
}
parts.push(piece);
}
format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
}
fn render_default_literal(value: &Value) -> String {
match value {
Value::Integer(i) => i.to_string(),
Value::Real(f) => f.to_string(),
Value::Bool(b) => {
if *b {
"TRUE".to_string()
} else {
"FALSE".to_string()
}
}
Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
Value::Null => "NULL".to_string(),
Value::Vector(_) => value.to_display_string(),
}
}
fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
let stmt = ast.pop().ok_or_else(|| {
SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
})?;
let create = CreateQuery::new(&stmt)?;
let columns = create
.columns
.into_iter()
.map(|pc| {
Column::with_default(
pc.name,
pc.datatype,
pc.is_pk,
pc.not_null,
pc.is_unique,
pc.default,
)
})
.collect();
Ok((create.table_name, columns))
}
fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
{
let mut map = rows.lock().expect("rows mutex poisoned");
for col in &columns {
let row = match &col.datatype {
DataType::Integer => Row::Integer(BTreeMap::new()),
DataType::Text => Row::Text(BTreeMap::new()),
DataType::Real => Row::Real(BTreeMap::new()),
DataType::Bool => Row::Bool(BTreeMap::new()),
DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
DataType::Json => Row::Text(BTreeMap::new()),
DataType::None | DataType::Invalid => Row::None,
};
map.insert(col.column_name.clone(), row);
if (col.is_pk || col.is_unique)
&& matches!(col.datatype, DataType::Integer | DataType::Text)
{
if let Ok(idx) = SecondaryIndex::new(
SecondaryIndex::auto_name(name, &col.column_name),
name.to_string(),
col.column_name.clone(),
&col.datatype,
true,
IndexOrigin::Auto,
) {
secondary_indexes.push(idx);
}
}
}
}
let primary_key = columns
.iter()
.find(|c| c.is_pk)
.map(|c| c.column_name.clone())
.unwrap_or_else(|| "-1".to_string());
Table {
tb_name: name.to_string(),
columns,
rows,
secondary_indexes,
hnsw_indexes: Vec::new(),
fts_indexes: Vec::new(),
last_rowid,
primary_key,
}
}
fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
let table = db.get_table_mut(table_name.clone()).map_err(|_| {
SQLRiteError::Internal(format!(
"index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
row.name
))
})?;
let datatype = table
.columns
.iter()
.find(|c| c.column_name == column_name)
.map(|c| clone_datatype(&c.datatype))
.ok_or_else(|| {
SQLRiteError::Internal(format!(
"index '{}' references unknown column '{column_name}' on '{table_name}'",
row.name
))
})?;
let existing_slot = table
.secondary_indexes
.iter()
.position(|i| i.name == row.name);
let idx = match existing_slot {
Some(i) => {
table.secondary_indexes.remove(i)
}
None => SecondaryIndex::new(
row.name.clone(),
table_name.clone(),
column_name.clone(),
&datatype,
is_unique,
IndexOrigin::Explicit,
)?,
};
let mut idx = idx;
let is_unique_flag = idx.is_unique;
let origin = idx.origin;
idx = SecondaryIndex::new(
idx.name,
idx.table_name,
idx.column_name,
&datatype,
is_unique_flag,
origin,
)?;
load_index_rows(pager, &mut idx, row.rootpage)?;
table.secondary_indexes.push(idx);
Ok(())
}
fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
if root_page == 0 {
return Ok(());
}
let first_leaf = find_leftmost_leaf(pager, root_page)?;
let mut current = first_leaf;
while current != 0 {
let page_buf = pager
.read_page(current)
.ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
if page_buf[0] != PageType::TableLeaf as u8 {
return Err(SQLRiteError::Internal(format!(
"page {current} tagged {} but expected TableLeaf (index)",
page_buf[0]
)));
}
let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
.try_into()
.map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
let leaf = TablePage::from_bytes(payload);
for slot in 0..leaf.slot_count() {
let offset = leaf.slot_offset_raw(slot)?;
let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
idx.insert(&ic.value, ic.rowid)?;
}
current = next_leaf;
}
Ok(())
}
fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
use sqlparser::ast::{CreateIndex, Expr, Statement};
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
let Some(Statement::CreateIndex(CreateIndex {
table_name,
columns,
unique,
..
})) = ast.pop()
else {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
)));
};
if columns.len() != 1 {
return Err(SQLRiteError::NotImplemented(
"multi-column indexes aren't supported yet".to_string(),
));
}
let col = match &columns[0].column.expr {
Expr::Identifier(ident) => ident.value.clone(),
Expr::CompoundIdentifier(parts) => {
parts.last().map(|p| p.value.clone()).unwrap_or_default()
}
other => {
return Err(SQLRiteError::Internal(format!(
"unsupported indexed column expression: {other:?}"
)));
}
};
Ok((table_name.to_string(), col, unique))
}
fn create_index_sql_uses_hnsw(sql: &str) -> bool {
use sqlparser::ast::{CreateIndex, IndexType, Statement};
let dialect = SqlriteDialect::new();
let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
return false;
};
let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
return false;
};
matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
}
fn create_index_sql_uses_fts(sql: &str) -> bool {
use sqlparser::ast::{CreateIndex, IndexType, Statement};
let dialect = SqlriteDialect::new();
let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
return false;
};
let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
return false;
};
matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
}
fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
use crate::sql::db::table::FtsIndexEntry;
use crate::sql::executor::execute_create_index;
use crate::sql::fts::PostingList;
use sqlparser::ast::Statement;
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
row.sql
)));
};
if row.rootpage == 0 {
execute_create_index(&stmt, db)?;
return Ok(());
}
let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
let index = PostingList::from_persisted_postings(doc_lengths, postings);
let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
SQLRiteError::Internal(format!(
"FTS index '{}' references unknown table '{tbl_name}'",
row.name
))
})?;
table_mut.fts_indexes.push(FtsIndexEntry {
name: row.name.clone(),
column_name: col_name,
index,
needs_rebuild: false,
});
Ok(())
}
fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
use sqlparser::ast::{CreateIndex, Expr, Statement};
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
let Some(Statement::CreateIndex(CreateIndex {
table_name,
columns,
..
})) = ast.pop()
else {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
)));
};
if columns.len() != 1 {
return Err(SQLRiteError::NotImplemented(
"multi-column FTS indexes aren't supported yet".to_string(),
));
}
let col = match &columns[0].column.expr {
Expr::Identifier(ident) => ident.value.clone(),
Expr::CompoundIdentifier(parts) => {
parts.last().map(|p| p.value.clone()).unwrap_or_default()
}
other => {
return Err(SQLRiteError::Internal(format!(
"FTS CREATE INDEX has unexpected column expr: {other:?}"
)));
}
};
Ok((table_name.to_string(), col))
}
fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
use crate::sql::db::table::HnswIndexEntry;
use crate::sql::executor::execute_create_index;
use crate::sql::hnsw::HnswIndex;
use sqlparser::ast::Statement;
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
row.sql
)));
};
if row.rootpage == 0 {
execute_create_index(&stmt, db)?;
return Ok(());
}
let (tbl_name, col_name, metric) = parse_hnsw_create_index_sql(&row.sql)?;
let nodes = load_hnsw_nodes(pager, row.rootpage)?;
let index = HnswIndex::from_persisted_nodes(metric, 0xC0FFEE, nodes);
let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
SQLRiteError::Internal(format!(
"HNSW index '{}' references unknown table '{tbl_name}'",
row.name
))
})?;
table_mut.hnsw_indexes.push(HnswIndexEntry {
name: row.name.clone(),
column_name: col_name,
metric,
index,
needs_rebuild: false,
});
Ok(())
}
fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
use crate::sql::pager::hnsw_cell::HnswNodeCell;
let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
let first_leaf = find_leftmost_leaf(pager, root_page)?;
let mut current = first_leaf;
while current != 0 {
let page_buf = pager
.read_page(current)
.ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
if page_buf[0] != PageType::TableLeaf as u8 {
return Err(SQLRiteError::Internal(format!(
"page {current} tagged {} but expected TableLeaf (HNSW)",
page_buf[0]
)));
}
let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
.try_into()
.map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
let leaf = TablePage::from_bytes(payload);
for slot in 0..leaf.slot_count() {
let offset = leaf.slot_offset_raw(slot)?;
let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
nodes.push((cell.node_id, cell.layers));
}
current = next_leaf;
}
Ok(nodes)
}
fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String, DistanceMetric)> {
use crate::sql::hnsw::DistanceMetric;
use sqlparser::ast::{BinaryOperator, CreateIndex, Expr, Statement, Value as AstValue};
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
let Some(Statement::CreateIndex(CreateIndex {
table_name,
columns,
with,
..
})) = ast.pop()
else {
return Err(SQLRiteError::Internal(format!(
"sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
)));
};
if columns.len() != 1 {
return Err(SQLRiteError::NotImplemented(
"multi-column HNSW indexes aren't supported yet".to_string(),
));
}
let col = match &columns[0].column.expr {
Expr::Identifier(ident) => ident.value.clone(),
Expr::CompoundIdentifier(parts) => {
parts.last().map(|p| p.value.clone()).unwrap_or_default()
}
other => {
return Err(SQLRiteError::Internal(format!(
"unsupported HNSW indexed column expression: {other:?}"
)));
}
};
let mut metric = DistanceMetric::L2;
for opt in &with {
if let Expr::BinaryOp { left, op, right } = opt {
if matches!(op, BinaryOperator::Eq) {
if let (Expr::Identifier(key), Expr::Value(v)) = (left.as_ref(), right.as_ref())
&& key.value.eq_ignore_ascii_case("metric")
{
if let AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) =
&v.value
{
metric = DistanceMetric::from_sql_name(s).ok_or_else(|| {
SQLRiteError::Internal(format!(
"sqlrite_master HNSW row carries unknown metric '{s}'"
))
})?;
}
}
}
}
}
Ok((table_name.to_string(), col, metric))
}
fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
use crate::sql::hnsw::HnswIndex;
for table in db.tables.values_mut() {
let dirty: Vec<(String, String, DistanceMetric)> = table
.hnsw_indexes
.iter()
.filter(|e| e.needs_rebuild)
.map(|e| (e.name.clone(), e.column_name.clone(), e.metric))
.collect();
if dirty.is_empty() {
continue;
}
for (idx_name, col_name, metric) in dirty {
let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
{
let row_data = table.rows.lock().expect("rows mutex poisoned");
if let Some(Row::Vector(map)) = row_data.get(&col_name) {
for (id, v) in map.iter() {
vectors.push((*id, v.clone()));
}
}
}
let snapshot: std::collections::HashMap<i64, Vec<f32>> =
vectors.iter().cloned().collect();
let mut new_idx = HnswIndex::new(metric, 0xC0FFEE);
vectors.sort_by_key(|(id, _)| *id);
for (id, v) in &vectors {
new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
}
if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
entry.index = new_idx;
entry.needs_rebuild = false;
}
}
}
}
fn synthesize_hnsw_create_index_sql(
index_name: &str,
table_name: &str,
column_name: &str,
metric: DistanceMetric,
) -> String {
if matches!(metric, DistanceMetric::L2) {
format!("CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name})")
} else {
format!(
"CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name}) WITH (metric = '{}')",
metric.sql_name()
)
}
}
fn rebuild_dirty_fts_indexes(db: &mut Database) {
use crate::sql::fts::PostingList;
for table in db.tables.values_mut() {
let dirty: Vec<(String, String)> = table
.fts_indexes
.iter()
.filter(|e| e.needs_rebuild)
.map(|e| (e.name.clone(), e.column_name.clone()))
.collect();
if dirty.is_empty() {
continue;
}
for (idx_name, col_name) in dirty {
let mut docs: Vec<(i64, String)> = Vec::new();
{
let row_data = table.rows.lock().expect("rows mutex poisoned");
if let Some(Row::Text(map)) = row_data.get(&col_name) {
for (id, v) in map.iter() {
if v != "Null" {
docs.push((*id, v.clone()));
}
}
}
}
let mut new_idx = PostingList::new();
docs.sort_by_key(|(id, _)| *id);
for (id, text) in &docs {
new_idx.insert(*id, text);
}
if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
entry.index = new_idx;
entry.needs_rebuild = false;
}
}
}
}
fn clone_datatype(dt: &DataType) -> DataType {
match dt {
DataType::Integer => DataType::Integer,
DataType::Text => DataType::Text,
DataType::Real => DataType::Real,
DataType::Bool => DataType::Bool,
DataType::Vector(dim) => DataType::Vector(*dim),
DataType::Json => DataType::Json,
DataType::None => DataType::None,
DataType::Invalid => DataType::Invalid,
}
}
fn stage_index_btree(
pager: &mut Pager,
idx: &SecondaryIndex,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<u32> {
let leaves = stage_index_leaves(pager, idx, alloc)?;
if leaves.len() == 1 {
return Ok(leaves[0].0);
}
let mut level: Vec<(u32, i64)> = leaves;
while level.len() > 1 {
level = stage_interior_level(pager, &level, alloc)?;
}
Ok(level[0].0)
}
fn stage_index_leaves(
pager: &mut Pager,
idx: &SecondaryIndex,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<(u32, i64)>> {
let mut leaves: Vec<(u32, i64)> = Vec::new();
let mut current_leaf = TablePage::empty();
let mut current_leaf_page = alloc.allocate();
let mut current_max_rowid: Option<i64> = None;
let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
entries.sort_by_key(|(_, r)| *r);
for (value, rowid) in entries {
let cell = IndexCell::new(rowid, value);
let entry_bytes = cell.encode()?;
if !current_leaf.would_fit(entry_bytes.len()) {
let next_leaf_page_num = alloc.allocate();
emit_leaf(pager, current_leaf_page, ¤t_leaf, next_leaf_page_num);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
current_leaf = TablePage::empty();
current_leaf_page = next_leaf_page_num;
if !current_leaf.would_fit(entry_bytes.len()) {
return Err(SQLRiteError::Internal(format!(
"index entry of {} bytes exceeds empty-page capacity {}",
entry_bytes.len(),
current_leaf.free_space()
)));
}
}
current_leaf.insert_entry(rowid, &entry_bytes)?;
current_max_rowid = Some(rowid);
}
emit_leaf(pager, current_leaf_page, ¤t_leaf, 0);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
Ok(leaves)
}
fn stage_hnsw_btree(
pager: &mut Pager,
idx: &crate::sql::hnsw::HnswIndex,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<u32> {
let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
if leaves.len() == 1 {
return Ok(leaves[0].0);
}
let mut level: Vec<(u32, i64)> = leaves;
while level.len() > 1 {
level = stage_interior_level(pager, &level, alloc)?;
}
Ok(level[0].0)
}
fn stage_fts_btree(
pager: &mut Pager,
idx: &crate::sql::fts::PostingList,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<u32> {
let leaves = stage_fts_leaves(pager, idx, alloc)?;
if leaves.len() == 1 {
return Ok(leaves[0].0);
}
let mut level: Vec<(u32, i64)> = leaves;
while level.len() > 1 {
level = stage_interior_level(pager, &level, alloc)?;
}
Ok(level[0].0)
}
fn stage_fts_leaves(
pager: &mut Pager,
idx: &crate::sql::fts::PostingList,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<(u32, i64)>> {
use crate::sql::pager::fts_cell::FtsPostingCell;
let mut leaves: Vec<(u32, i64)> = Vec::new();
let mut current_leaf = TablePage::empty();
let mut current_leaf_page = alloc.allocate();
let mut current_max_rowid: Option<i64> = None;
let mut cell_id: i64 = 1;
let mut cells: Vec<FtsPostingCell> = Vec::new();
cells.push(FtsPostingCell::doc_lengths(
cell_id,
idx.serialize_doc_lengths(),
));
for (term, entries) in idx.serialize_postings() {
cell_id += 1;
cells.push(FtsPostingCell::posting(cell_id, term, entries));
}
for cell in cells {
let entry_bytes = cell.encode()?;
if !current_leaf.would_fit(entry_bytes.len()) {
let next_leaf_page_num = alloc.allocate();
emit_leaf(pager, current_leaf_page, ¤t_leaf, next_leaf_page_num);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
current_leaf = TablePage::empty();
current_leaf_page = next_leaf_page_num;
if !current_leaf.would_fit(entry_bytes.len()) {
return Err(SQLRiteError::Internal(format!(
"FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
(term too long or too many postings; overflow chaining is Phase 8.1)",
cell.cell_id,
entry_bytes.len(),
current_leaf.free_space()
)));
}
}
current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
current_max_rowid = Some(cell.cell_id);
}
emit_leaf(pager, current_leaf_page, ¤t_leaf, 0);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
Ok(leaves)
}
type FtsEntries = Vec<(i64, u32)>;
type FtsPostings = Vec<(String, FtsEntries)>;
fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
use crate::sql::pager::fts_cell::FtsPostingCell;
let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
let mut saw_sidecar = false;
let first_leaf = find_leftmost_leaf(pager, root_page)?;
let mut current = first_leaf;
while current != 0 {
let page_buf = pager
.read_page(current)
.ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
if page_buf[0] != PageType::TableLeaf as u8 {
return Err(SQLRiteError::Internal(format!(
"page {current} tagged {} but expected TableLeaf (FTS)",
page_buf[0]
)));
}
let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
.try_into()
.map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
let leaf = TablePage::from_bytes(payload);
for slot in 0..leaf.slot_count() {
let offset = leaf.slot_offset_raw(slot)?;
let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
if cell.is_doc_lengths() {
if saw_sidecar {
return Err(SQLRiteError::Internal(
"FTS index has more than one doc-lengths sidecar cell".to_string(),
));
}
saw_sidecar = true;
doc_lengths = cell.entries;
} else {
postings.push((cell.term, cell.entries));
}
}
current = next_leaf;
}
if !saw_sidecar {
return Err(SQLRiteError::Internal(
"FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
));
}
Ok((doc_lengths, postings))
}
fn stage_hnsw_leaves(
pager: &mut Pager,
idx: &crate::sql::hnsw::HnswIndex,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<(u32, i64)>> {
use crate::sql::pager::hnsw_cell::HnswNodeCell;
let mut leaves: Vec<(u32, i64)> = Vec::new();
let mut current_leaf = TablePage::empty();
let mut current_leaf_page = alloc.allocate();
let mut current_max_rowid: Option<i64> = None;
let serialized = idx.serialize_nodes();
for (node_id, layers) in serialized {
let cell = HnswNodeCell::new(node_id, layers);
let entry_bytes = cell.encode()?;
if !current_leaf.would_fit(entry_bytes.len()) {
let next_leaf_page_num = alloc.allocate();
emit_leaf(pager, current_leaf_page, ¤t_leaf, next_leaf_page_num);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
current_leaf = TablePage::empty();
current_leaf_page = next_leaf_page_num;
if !current_leaf.would_fit(entry_bytes.len()) {
return Err(SQLRiteError::Internal(format!(
"HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
entry_bytes.len(),
current_leaf.free_space()
)));
}
}
current_leaf.insert_entry(node_id, &entry_bytes)?;
current_max_rowid = Some(node_id);
}
emit_leaf(pager, current_leaf_page, ¤t_leaf, 0);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
Ok(leaves)
}
fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
let first_leaf = find_leftmost_leaf(pager, root_page)?;
let mut current = first_leaf;
while current != 0 {
let page_buf = pager
.read_page(current)
.ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
if page_buf[0] != PageType::TableLeaf as u8 {
return Err(SQLRiteError::Internal(format!(
"page {current} tagged {} but expected TableLeaf",
page_buf[0]
)));
}
let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
.try_into()
.map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
let leaf = TablePage::from_bytes(payload);
for slot in 0..leaf.slot_count() {
let entry = leaf.entry_at(slot)?;
let cell = match entry {
PagedEntry::Local(c) => c,
PagedEntry::Overflow(r) => {
let body_bytes =
read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
let (c, _) = Cell::decode(&body_bytes, 0)?;
c
}
};
table.restore_row(cell.rowid, cell.values)?;
}
current = next_leaf;
}
Ok(())
}
fn collect_pages_for_btree(
pager: &Pager,
root_page: u32,
follow_overflow: bool,
) -> Result<Vec<u32>> {
if root_page == 0 {
return Ok(Vec::new());
}
let mut pages: Vec<u32> = Vec::new();
let mut stack: Vec<u32> = vec![root_page];
while let Some(p) = stack.pop() {
let buf = pager.read_page(p).ok_or_else(|| {
SQLRiteError::Internal(format!(
"collect_pages: missing page {p} (rooted at {root_page})"
))
})?;
pages.push(p);
match buf[0] {
t if t == PageType::InteriorNode as u8 => {
let payload: &[u8; PAYLOAD_PER_PAGE] =
(&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
SQLRiteError::Internal("interior payload slice size".to_string())
})?;
let interior = InteriorPage::from_bytes(payload);
for slot in 0..interior.slot_count() {
let cell = interior.cell_at(slot)?;
stack.push(cell.child_page);
}
stack.push(interior.rightmost_child());
}
t if t == PageType::TableLeaf as u8 => {
if follow_overflow {
let payload: &[u8; PAYLOAD_PER_PAGE] =
(&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
SQLRiteError::Internal("leaf payload slice size".to_string())
})?;
let leaf = TablePage::from_bytes(payload);
for slot in 0..leaf.slot_count() {
match leaf.entry_at(slot)? {
PagedEntry::Local(_) => {}
PagedEntry::Overflow(r) => {
let mut cur = r.first_overflow_page;
while cur != 0 {
pages.push(cur);
let ob = pager.read_page(cur).ok_or_else(|| {
SQLRiteError::Internal(format!(
"collect_pages: missing overflow page {cur}"
))
})?;
if ob[0] != PageType::Overflow as u8 {
return Err(SQLRiteError::Internal(format!(
"collect_pages: page {cur} expected Overflow, got tag {}",
ob[0]
)));
}
cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
}
}
}
}
}
}
other => {
return Err(SQLRiteError::Internal(format!(
"collect_pages: unexpected page type {other} at page {p}"
)));
}
}
}
Ok(pages)
}
fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
let mut out: HashMap<(String, String), u32> = HashMap::new();
if schema_root == 0 {
return Ok(out);
}
let mut master = build_empty_master_table();
load_table_rows(pager, &mut master, schema_root)?;
for rowid in master.rowids() {
let kind = take_text(&master, "type", rowid)?;
let name = take_text(&master, "name", rowid)?;
let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
out.insert((kind, name), rootpage);
}
Ok(out)
}
fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
let mut current = root_page;
loop {
let page_buf = pager.read_page(current).ok_or_else(|| {
SQLRiteError::Internal(format!("missing page {current} during tree descent"))
})?;
match page_buf[0] {
t if t == PageType::TableLeaf as u8 => return Ok(current),
t if t == PageType::InteriorNode as u8 => {
let payload: &[u8; PAYLOAD_PER_PAGE] =
(&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
SQLRiteError::Internal("interior payload slice size".to_string())
})?;
let interior = InteriorPage::from_bytes(payload);
current = interior.leftmost_child()?;
}
other => {
return Err(SQLRiteError::Internal(format!(
"unexpected page type {other} during tree descent at page {current}"
)));
}
}
}
}
fn stage_table_btree(
pager: &mut Pager,
table: &Table,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<u32> {
let leaves = stage_leaves(pager, table, alloc)?;
if leaves.len() == 1 {
return Ok(leaves[0].0);
}
let mut level: Vec<(u32, i64)> = leaves;
while level.len() > 1 {
level = stage_interior_level(pager, &level, alloc)?;
}
Ok(level[0].0)
}
fn stage_leaves(
pager: &mut Pager,
table: &Table,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<(u32, i64)>> {
let mut leaves: Vec<(u32, i64)> = Vec::new();
let mut current_leaf = TablePage::empty();
let mut current_leaf_page = alloc.allocate();
let mut current_max_rowid: Option<i64> = None;
for rowid in table.rowids() {
let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
if !current_leaf.would_fit(entry_bytes.len()) {
let next_leaf_page_num = alloc.allocate();
emit_leaf(pager, current_leaf_page, ¤t_leaf, next_leaf_page_num);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
current_leaf = TablePage::empty();
current_leaf_page = next_leaf_page_num;
if !current_leaf.would_fit(entry_bytes.len()) {
return Err(SQLRiteError::Internal(format!(
"entry of {} bytes exceeds empty-page capacity {}",
entry_bytes.len(),
current_leaf.free_space()
)));
}
}
current_leaf.insert_entry(rowid, &entry_bytes)?;
current_max_rowid = Some(rowid);
}
emit_leaf(pager, current_leaf_page, ¤t_leaf, 0);
leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
Ok(leaves)
}
fn build_row_entry(
pager: &mut Pager,
table: &Table,
rowid: i64,
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<u8>> {
let values = table.extract_row(rowid);
let local_cell = Cell::new(rowid, values);
let local_bytes = local_cell.encode()?;
if local_bytes.len() > OVERFLOW_THRESHOLD {
let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
Ok(OverflowRef {
rowid,
total_body_len: local_bytes.len() as u64,
first_overflow_page: overflow_start,
}
.encode())
} else {
Ok(local_bytes)
}
}
fn stage_interior_level(
pager: &mut Pager,
children: &[(u32, i64)],
alloc: &mut crate::sql::pager::allocator::PageAllocator,
) -> Result<Vec<(u32, i64)>> {
let mut next_level: Vec<(u32, i64)> = Vec::new();
let mut idx = 0usize;
while idx < children.len() {
let interior_page_num = alloc.allocate();
let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
idx += 1;
let mut interior = InteriorPage::empty(rightmost_child_page);
while idx < children.len() {
let new_divider_cell = InteriorCell {
divider_rowid: rightmost_child_max,
child_page: rightmost_child_page,
};
let new_divider_bytes = new_divider_cell.encode();
if !interior.would_fit(new_divider_bytes.len()) {
break;
}
interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
let (next_child_page, next_child_max) = children[idx];
interior.set_rightmost_child(next_child_page);
rightmost_child_page = next_child_page;
rightmost_child_max = next_child_max;
idx += 1;
}
emit_interior(pager, interior_page_num, &interior);
next_level.push((interior_page_num, rightmost_child_max));
}
Ok(next_level)
}
fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
let mut buf = [0u8; PAGE_SIZE];
buf[0] = PageType::TableLeaf as u8;
buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
buf[5..7].copy_from_slice(&0u16.to_le_bytes());
buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
pager.stage_page(page_num, buf);
}
fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
let mut buf = [0u8; PAGE_SIZE];
buf[0] = PageType::InteriorNode as u8;
buf[1..5].copy_from_slice(&0u32.to_le_bytes());
buf[5..7].copy_from_slice(&0u16.to_le_bytes());
buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
pager.stage_page(page_num, buf);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
use crate::sql::process_command;
fn seed_db() -> Database {
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
&mut db,
)
.unwrap();
process_command(
"INSERT INTO users (name, age) VALUES ('alice', 30);",
&mut db,
)
.unwrap();
process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
process_command(
"CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
db
}
fn tmp_path(name: &str) -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
p
}
fn cleanup(path: &std::path::Path) {
let _ = std::fs::remove_file(path);
let mut wal = path.as_os_str().to_owned();
wal.push("-wal");
let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
}
#[test]
fn round_trip_preserves_schema_and_data() {
let path = tmp_path("roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
let loaded = open_database(&path, "test".to_string()).expect("open");
assert_eq!(loaded.tables.len(), 2);
let users = loaded.get_table("users".to_string()).expect("users table");
assert_eq!(users.columns.len(), 3);
let rowids = users.rowids();
assert_eq!(rowids.len(), 2);
let names: Vec<String> = rowids
.iter()
.filter_map(|r| match users.get_value("name", *r) {
Some(Value::Text(s)) => Some(s),
_ => None,
})
.collect();
assert!(names.contains(&"alice".to_string()));
assert!(names.contains(&"bob".to_string()));
let notes = loaded.get_table("notes".to_string()).expect("notes table");
assert_eq!(notes.rowids().len(), 1);
cleanup(&path);
}
#[test]
fn round_trip_preserves_vector_column() {
let path = tmp_path("vec_roundtrip");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
&mut db,
)
.unwrap();
process_command(
"INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
&mut db,
)
.unwrap();
process_command(
"INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
&mut db,
)
.unwrap();
save_database(&mut db, &path).expect("save");
}
let loaded = open_database(&path, "test".to_string()).expect("open");
let docs = loaded.get_table("docs".to_string()).expect("docs table");
let embedding_col = docs
.columns
.iter()
.find(|c| c.column_name == "embedding")
.expect("embedding column");
assert!(
matches!(embedding_col.datatype, DataType::Vector(3)),
"expected DataType::Vector(3) after round-trip, got {:?}",
embedding_col.datatype
);
let mut rows: Vec<Vec<f32>> = docs
.rowids()
.iter()
.filter_map(|r| match docs.get_value("embedding", *r) {
Some(Value::Vector(v)) => Some(v),
_ => None,
})
.collect();
rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
assert_eq!(rows.len(), 2);
assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
cleanup(&path);
}
#[test]
fn round_trip_preserves_json_column() {
let path = tmp_path("json_roundtrip");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
&mut db,
)
.unwrap();
process_command(
r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
&mut db,
)
.unwrap();
save_database(&mut db, &path).expect("save");
}
let mut loaded = open_database(&path, "test".to_string()).expect("open");
let docs = loaded.get_table("docs".to_string()).expect("docs");
let payload_col = docs
.columns
.iter()
.find(|c| c.column_name == "payload")
.unwrap();
assert!(
matches!(payload_col.datatype, DataType::Json),
"expected DataType::Json, got {:?}",
payload_col.datatype
);
let resp = process_command(
r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
&mut loaded,
)
.expect("select via json_extract after reopen");
assert!(resp.contains("1 row returned"), "got: {resp}");
cleanup(&path);
}
#[test]
fn round_trip_rebuilds_hnsw_index_from_create_sql() {
let path = tmp_path("hnsw_roundtrip");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
&mut db,
)
.unwrap();
for v in &[
"[1.0, 0.0]",
"[2.0, 0.0]",
"[0.0, 3.0]",
"[1.0, 4.0]",
"[10.0, 10.0]",
] {
process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
}
process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
save_database(&mut db, &path).expect("save");
}
let mut loaded = open_database(&path, "test".to_string()).expect("open");
{
let table = loaded.get_table("docs".to_string()).expect("docs");
assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
let entry = &table.hnsw_indexes[0];
assert_eq!(entry.name, "ix_e");
assert_eq!(entry.column_name, "e");
assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
assert!(
!entry.needs_rebuild,
"fresh load should not be marked dirty"
);
}
let resp = process_command(
"SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
&mut loaded,
)
.unwrap();
assert!(resp.contains("3 rows returned"), "got: {resp}");
cleanup(&path);
}
#[test]
fn round_trip_preserves_hnsw_cosine_metric() {
use crate::sql::hnsw::DistanceMetric;
let path = tmp_path("hnsw_metric_roundtrip");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
&mut db,
)
.unwrap();
for v in &["[1.0, 0.0]", "[0.0, 1.0]", "[0.7071, 0.7071]"] {
process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
}
process_command(
"CREATE INDEX ix_cos ON docs USING hnsw (e) WITH (metric = 'cosine');",
&mut db,
)
.unwrap();
save_database(&mut db, &path).expect("save");
}
let mut loaded = open_database(&path, "test".to_string()).expect("open");
{
let table = loaded.get_table("docs".to_string()).expect("docs");
assert_eq!(table.hnsw_indexes.len(), 1);
assert_eq!(
table.hnsw_indexes[0].metric,
DistanceMetric::Cosine,
"metric should round-trip through CREATE INDEX SQL"
);
assert_eq!(table.hnsw_indexes[0].index.distance, DistanceMetric::Cosine);
}
let resp = process_command(
"SELECT id FROM docs ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1;",
&mut loaded,
)
.unwrap();
assert!(resp.contains("1 row returned"), "got: {resp}");
cleanup(&path);
}
#[test]
fn round_trip_rebuilds_fts_index_from_create_sql() {
let path = tmp_path("fts_roundtrip");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
for body in &[
"rust embedded database",
"rust web framework",
"go embedded systems",
"python web framework",
"rust rust embedded power",
] {
process_command(
&format!("INSERT INTO docs (body) VALUES ('{body}');"),
&mut db,
)
.unwrap();
}
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
save_database(&mut db, &path).expect("save");
}
let mut loaded = open_database(&path, "test".to_string()).expect("open");
{
let table = loaded.get_table("docs".to_string()).expect("docs");
assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
let entry = &table.fts_indexes[0];
assert_eq!(entry.name, "ix_body");
assert_eq!(entry.column_name, "body");
assert_eq!(
entry.index.len(),
5,
"rebuilt posting list should hold all 5 rows"
);
assert!(!entry.needs_rebuild);
}
let resp = process_command(
"SELECT id FROM docs WHERE fts_match(body, 'rust');",
&mut loaded,
)
.unwrap();
assert!(resp.contains("3 rows returned"), "got: {resp}");
cleanup(&path);
}
#[test]
fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
let path = tmp_path("fts_delete_rebuild");
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
for body in &[
"rust embedded",
"rust framework",
"go embedded",
"python web",
] {
process_command(
&format!("INSERT INTO docs (body) VALUES ('{body}');"),
&mut db,
)
.unwrap();
}
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
save_database(&mut db, &path).expect("save");
drop(db);
let mut loaded = open_database(&path, "test".to_string()).expect("open");
let resp = process_command(
"SELECT id FROM docs WHERE fts_match(body, 'rust');",
&mut loaded,
)
.unwrap();
assert!(resp.contains("1 row returned"), "got: {resp}");
cleanup(&path);
}
#[test]
fn fts_roundtrip_uses_persistence_path_not_replay() {
let path = tmp_path("fts_persistence_path");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command(
"INSERT INTO docs (body) VALUES ('rust embedded database');",
&mut db,
)
.unwrap();
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
save_database(&mut db, &path).expect("save");
}
let pager = Pager::open(&path).expect("open pager");
let mut master = build_empty_master_table();
load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
let mut found_rootpage: Option<u32> = None;
for rowid in master.rowids() {
let name = take_text(&master, "name", rowid).unwrap();
if name == "ix_body" {
let rp = take_integer(&master, "rootpage", rowid).unwrap();
found_rootpage = Some(rp as u32);
}
}
let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
assert!(
rootpage != 0,
"Phase 8c FTS save should set rootpage != 0; got {rootpage}"
);
cleanup(&path);
}
#[test]
fn save_without_fts_keeps_format_v4() {
use crate::sql::pager::header::FORMAT_VERSION_V4;
let path = tmp_path("fts_no_bump");
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
&mut db,
)
.unwrap();
process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
save_database(&mut db, &path).unwrap();
drop(db);
let pager = Pager::open(&path).expect("open");
assert_eq!(
pager.header().format_version,
FORMAT_VERSION_V4,
"no-FTS save should keep v4"
);
cleanup(&path);
}
#[test]
fn save_with_fts_bumps_to_v5() {
use crate::sql::pager::header::FORMAT_VERSION_V5;
let path = tmp_path("fts_bump_v5");
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
save_database(&mut db, &path).unwrap();
drop(db);
let pager = Pager::open(&path).expect("open");
assert_eq!(
pager.header().format_version,
FORMAT_VERSION_V5,
"FTS save should promote to v5"
);
cleanup(&path);
}
#[test]
fn fts_persistence_handles_empty_and_zero_token_docs() {
let path = tmp_path("fts_edges");
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
save_database(&mut db, &path).unwrap();
}
let loaded = open_database(&path, "test".to_string()).expect("open");
let table = loaded.get_table("docs".to_string()).unwrap();
let entry = &table.fts_indexes[0];
assert_eq!(entry.index.len(), 3);
let res = entry
.index
.query("embedded", &crate::sql::fts::Bm25Params::default());
assert_eq!(res.len(), 2);
cleanup(&path);
}
#[test]
fn fts_persistence_round_trips_large_corpus() {
let path = tmp_path("fts_large_corpus");
let mut expected_terms: std::collections::BTreeSet<String> =
std::collections::BTreeSet::new();
{
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
for i in 0..500 {
let term = format!("term{i:04}");
process_command(
&format!("INSERT INTO docs (body) VALUES ('{term}');"),
&mut db,
)
.unwrap();
expected_terms.insert(term);
}
save_database(&mut db, &path).unwrap();
}
let loaded = open_database(&path, "test".to_string()).expect("open");
let table = loaded.get_table("docs".to_string()).unwrap();
let entry = &table.fts_indexes[0];
assert_eq!(entry.index.len(), 500);
for &i in &[0_i64, 137, 248, 391, 499] {
let term = format!("term{i:04}");
let res = entry
.index
.query(&term, &crate::sql::fts::Bm25Params::default());
assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
assert_eq!(res[0].0, i + 1);
}
cleanup(&path);
}
#[test]
fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
let path = tmp_path("hnsw_delete_rebuild");
let mut db = Database::new("test".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
&mut db,
)
.unwrap();
for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
}
process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
assert!(dirty_before_save, "DELETE should mark dirty");
save_database(&mut db, &path).expect("save");
let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
assert!(!dirty_after_save, "save should clear dirty");
drop(db);
let loaded = open_database(&path, "test".to_string()).expect("open");
let docs = loaded.get_table("docs".to_string()).expect("docs");
assert!(
!docs.rowids().contains(&1),
"deleted row 1 should not be in row storage"
);
assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
assert_eq!(
docs.hnsw_indexes[0].index.len(),
3,
"HNSW graph should have shed the deleted node"
);
cleanup(&path);
}
#[test]
fn round_trip_survives_writes_after_load() {
let path = tmp_path("after_load");
save_database(&mut seed_db(), &path).unwrap();
{
let mut db = open_database(&path, "test".to_string()).unwrap();
process_command(
"INSERT INTO users (name, age) VALUES ('carol', 40);",
&mut db,
)
.unwrap();
save_database(&mut db, &path).unwrap();
}
let db2 = open_database(&path, "test".to_string()).unwrap();
let users = db2.get_table("users".to_string()).unwrap();
assert_eq!(users.rowids().len(), 3);
cleanup(&path);
}
#[test]
fn open_rejects_garbage_file() {
let path = tmp_path("bad");
std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
let result = open_database(&path, "x".to_string());
assert!(result.is_err());
cleanup(&path);
}
#[test]
fn many_small_rows_spread_across_leaves() {
let path = tmp_path("many_rows");
let mut db = Database::new("big".to_string());
process_command(
"CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
&mut db,
)
.unwrap();
for i in 0..200 {
let body = "x".repeat(200);
let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
process_command(&q, &mut db).unwrap();
}
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "big".to_string()).unwrap();
let things = loaded.get_table("things".to_string()).unwrap();
assert_eq!(things.rowids().len(), 200);
cleanup(&path);
}
#[test]
fn huge_row_goes_through_overflow() {
let path = tmp_path("overflow_row");
let mut db = Database::new("big".to_string());
process_command(
"CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
let body = "A".repeat(10_000);
process_command(
&format!("INSERT INTO docs (body) VALUES ('{body}');"),
&mut db,
)
.unwrap();
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "big".to_string()).unwrap();
let docs = loaded.get_table("docs".to_string()).unwrap();
let rowids = docs.rowids();
assert_eq!(rowids.len(), 1);
let stored = docs.get_value("body", rowids[0]);
match stored {
Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
other => panic!("expected Text, got {other:?}"),
}
cleanup(&path);
}
#[test]
fn create_sql_synthesis_round_trips() {
let mut db = Database::new("x".to_string());
process_command(
"CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
&mut db,
)
.unwrap();
let t = db.get_table("t".to_string()).unwrap();
let sql = table_to_create_sql(t);
let (name, cols) = parse_create_sql(&sql).unwrap();
assert_eq!(name, "t");
assert_eq!(cols.len(), 3);
assert!(cols[0].is_pk);
assert!(cols[1].is_unique);
assert!(cols[2].not_null);
}
#[test]
fn sqlrite_master_is_not_exposed_as_a_user_table() {
let path = tmp_path("no_master");
save_database(&mut seed_db(), &path).unwrap();
let loaded = open_database(&path, "x".to_string()).unwrap();
assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
cleanup(&path);
}
#[test]
fn multi_leaf_table_produces_an_interior_root() {
let path = tmp_path("multi_leaf_interior");
let mut db = Database::new("big".to_string());
process_command(
"CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
&mut db,
)
.unwrap();
for i in 0..200 {
let body = "x".repeat(200);
let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
process_command(&q, &mut db).unwrap();
}
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "big".to_string()).unwrap();
let things = loaded.get_table("things".to_string()).unwrap();
assert_eq!(things.rowids().len(), 200);
let pager = loaded
.pager
.as_ref()
.expect("loaded DB should have a pager");
let mut master = build_empty_master_table();
load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
let things_root = master
.rowids()
.into_iter()
.find_map(|r| match master.get_value("name", r) {
Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
Some(Value::Integer(p)) => Some(p as u32),
_ => None,
},
_ => None,
})
.expect("things should appear in sqlrite_master");
let root_buf = pager.read_page(things_root).unwrap();
assert_eq!(
root_buf[0],
PageType::InteriorNode as u8,
"expected a multi-leaf table to have an interior root, got tag {}",
root_buf[0]
);
cleanup(&path);
}
#[test]
fn explicit_index_persists_across_save_and_open() {
let path = tmp_path("idx_persist");
let mut db = Database::new("idx".to_string());
process_command(
"CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
&mut db,
)
.unwrap();
for i in 1..=5 {
let tag = if i % 2 == 0 { "odd" } else { "even" };
process_command(
&format!("INSERT INTO users (tag) VALUES ('{tag}');"),
&mut db,
)
.unwrap();
}
process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "idx".to_string()).unwrap();
let users = loaded.get_table("users".to_string()).unwrap();
let idx = users
.index_by_name("users_tag_idx")
.expect("explicit index should survive save/open");
assert_eq!(idx.column_name, "tag");
assert!(!idx.is_unique);
let even_rowids = idx.lookup(&Value::Text("even".into()));
let odd_rowids = idx.lookup(&Value::Text("odd".into()));
assert_eq!(even_rowids.len(), 3);
assert_eq!(odd_rowids.len(), 2);
cleanup(&path);
}
#[test]
fn auto_indexes_for_unique_columns_survive_save_open() {
let path = tmp_path("auto_idx_persist");
let mut db = Database::new("a".to_string());
process_command(
"CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
&mut db,
)
.unwrap();
process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "a".to_string()).unwrap();
let users = loaded.get_table("users".to_string()).unwrap();
let auto_name = SecondaryIndex::auto_name("users", "email");
let idx = users
.index_by_name(&auto_name)
.expect("auto index should be restored");
assert!(idx.is_unique);
assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
cleanup(&path);
}
#[test]
fn secondary_index_with_interior_level_round_trips() {
let path = tmp_path("sqlr1_wide_index");
let mut db = Database::new("idx".to_string());
db.source_path = Some(path.clone());
process_command(
"CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
&mut db,
)
.unwrap();
process_command("BEGIN;", &mut db).unwrap();
for i in 0..5000 {
process_command(
&format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
&mut db,
)
.unwrap();
}
process_command("COMMIT;", &mut db).unwrap();
process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
drop(db);
let loaded = open_database(&path, "idx".to_string()).unwrap();
let bloat = loaded.get_table("bloat".to_string()).unwrap();
let idx = bloat
.index_by_name("idx_p")
.expect("idx_p should survive close/reopen");
assert!(!idx.is_unique);
for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
let value = Value::Text(format!("p-{probe_i:08}"));
let hits = idx.lookup(&value);
assert_eq!(
hits,
vec![expected_rowid],
"lookup({value:?}) should yield rowid {expected_rowid}",
);
}
let pager = loaded.pager.as_ref().unwrap();
let mut master = build_empty_master_table();
load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
let idx_root = master
.rowids()
.into_iter()
.find_map(
|r| match (master.get_value("name", r), master.get_value("type", r)) {
(Some(Value::Text(name)), Some(Value::Text(kind)))
if name == "idx_p" && kind == "index" =>
{
match master.get_value("rootpage", r) {
Some(Value::Integer(p)) => Some(p as u32),
_ => None,
}
}
_ => None,
},
)
.expect("idx_p should appear in sqlrite_master");
let root_buf = pager.read_page(idx_root).unwrap();
assert_eq!(
root_buf[0],
PageType::InteriorNode as u8,
"5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
);
let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
let leaf_buf = pager.read_page(leaf).unwrap();
assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
cleanup(&path);
}
#[test]
fn drop_then_recreate_wide_index_does_not_panic() {
let path = tmp_path("sqlr1_drop_recreate");
let mut db = Database::new("idx".to_string());
db.source_path = Some(path.clone());
process_command(
"CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
&mut db,
)
.unwrap();
process_command("BEGIN;", &mut db).unwrap();
for i in 0..5000 {
process_command(
&format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
&mut db,
)
.unwrap();
}
process_command("COMMIT;", &mut db).unwrap();
process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
process_command("DROP INDEX idx_p;", &mut db).unwrap();
process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
drop(db);
let loaded = open_database(&path, "idx".to_string()).unwrap();
let bloat = loaded.get_table("bloat".to_string()).unwrap();
let idx = bloat
.index_by_name("idx_p")
.expect("idx_p should survive drop+recreate+reopen");
assert_eq!(
idx.lookup(&Value::Text("p-00002500".into())),
vec![2501],
"post-recycle lookup must still resolve correctly",
);
cleanup(&path);
}
#[test]
fn deep_tree_round_trips() {
use crate::sql::db::table::Column as TableColumn;
let path = tmp_path("deep_tree");
let mut db = Database::new("deep".to_string());
let columns = vec![
TableColumn::new("id".into(), "integer".into(), true, true, true),
TableColumn::new("s".into(), "text".into(), false, true, false),
];
let mut table = build_empty_table("t", columns, 0);
for i in 1..=6_000i64 {
let body = "q".repeat(900);
table
.restore_row(
i,
vec![
Some(Value::Integer(i)),
Some(Value::Text(format!("r-{i}-{body}"))),
],
)
.unwrap();
}
db.tables.insert("t".to_string(), table);
save_database(&mut db, &path).unwrap();
let loaded = open_database(&path, "deep".to_string()).unwrap();
let t = loaded.get_table("t".to_string()).unwrap();
assert_eq!(t.rowids().len(), 6_000);
let pager = loaded.pager.as_ref().unwrap();
let mut master = build_empty_master_table();
load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
let t_root = master
.rowids()
.into_iter()
.find_map(|r| match master.get_value("name", r) {
Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
Some(Value::Integer(p)) => Some(p as u32),
_ => None,
},
_ => None,
})
.expect("t in sqlrite_master");
let root_buf = pager.read_page(t_root).unwrap();
assert_eq!(root_buf[0], PageType::InteriorNode as u8);
let root_payload: &[u8; PAYLOAD_PER_PAGE] =
(&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
let root_interior = InteriorPage::from_bytes(root_payload);
let child = root_interior.leftmost_child().unwrap();
let child_buf = pager.read_page(child).unwrap();
assert_eq!(
child_buf[0],
PageType::InteriorNode as u8,
"expected 3-level tree: root's leftmost child should also be InteriorNode",
);
cleanup(&path);
}
#[test]
fn alter_rename_table_survives_save_and_reopen() {
let path = tmp_path("alter_rename_table_roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
save_database(&mut db, &path).expect("save after rename");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
assert!(!loaded.contains_table("users".to_string()));
assert!(loaded.contains_table("members".to_string()));
let members = loaded.get_table("members".to_string()).unwrap();
assert_eq!(members.rowids().len(), 2, "rows should survive");
assert!(
members
.index_by_name("sqlrite_autoindex_members_id")
.is_some()
);
assert!(
members
.index_by_name("sqlrite_autoindex_members_name")
.is_some()
);
cleanup(&path);
}
#[test]
fn alter_rename_column_survives_save_and_reopen() {
let path = tmp_path("alter_rename_col_roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
process_command(
"ALTER TABLE users RENAME COLUMN name TO full_name;",
&mut db,
)
.expect("rename column");
save_database(&mut db, &path).expect("save after rename");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
let users = loaded.get_table("users".to_string()).unwrap();
assert!(users.contains_column("full_name".to_string()));
assert!(!users.contains_column("name".to_string()));
let alice_rowid = users
.rowids()
.into_iter()
.find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
.expect("alice row should be findable under renamed column");
assert_eq!(
users.get_value("full_name", alice_rowid),
Some(Value::Text("alice".to_string()))
);
cleanup(&path);
}
#[test]
fn alter_add_column_with_default_survives_save_and_reopen() {
let path = tmp_path("alter_add_default_roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
process_command(
"ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
&mut db,
)
.expect("add column");
save_database(&mut db, &path).expect("save after add");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
let users = loaded.get_table("users".to_string()).unwrap();
assert!(users.contains_column("status".to_string()));
for rowid in users.rowids() {
assert_eq!(
users.get_value("status", rowid),
Some(Value::Text("active".to_string())),
"backfilled default should round-trip for rowid {rowid}"
);
}
let status_col = users
.columns
.iter()
.find(|c| c.column_name == "status")
.unwrap();
assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
cleanup(&path);
}
#[test]
fn alter_drop_column_survives_save_and_reopen() {
let path = tmp_path("alter_drop_col_roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
save_database(&mut db, &path).expect("save after drop");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
let users = loaded.get_table("users".to_string()).unwrap();
assert!(!users.contains_column("age".to_string()));
assert!(users.contains_column("name".to_string()));
cleanup(&path);
}
#[test]
fn drop_table_survives_save_and_reopen() {
let path = tmp_path("drop_table_roundtrip");
let mut db = seed_db();
save_database(&mut db, &path).expect("save");
{
let loaded = open_database(&path, "t".to_string()).expect("open");
assert!(loaded.contains_table("users".to_string()));
assert!(loaded.contains_table("notes".to_string()));
}
process_command("DROP TABLE users;", &mut db).expect("drop users");
save_database(&mut db, &path).expect("save after drop");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
assert!(
!loaded.contains_table("users".to_string()),
"dropped table should not resurface on reopen"
);
assert!(
loaded.contains_table("notes".to_string()),
"untouched table should survive"
);
cleanup(&path);
}
#[test]
fn drop_index_survives_save_and_reopen() {
let path = tmp_path("drop_index_roundtrip");
let mut db = Database::new("t".to_string());
process_command(
"CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
save_database(&mut db, &path).expect("save");
process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
save_database(&mut db, &path).expect("save after drop");
let loaded = open_database(&path, "t".to_string()).expect("reopen");
let notes = loaded.get_table("notes".to_string()).unwrap();
assert!(
notes.index_by_name("notes_body_idx").is_none(),
"dropped index should not resurface on reopen"
);
assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
cleanup(&path);
}
#[test]
fn default_clause_survives_save_and_reopen() {
let path = tmp_path("default_roundtrip");
let mut db = Database::new("t".to_string());
process_command(
"CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
&mut db,
)
.unwrap();
save_database(&mut db, &path).expect("save");
let mut loaded = open_database(&path, "t".to_string()).expect("open");
let users = loaded.get_table("users".to_string()).expect("users table");
let status_col = users
.columns
.iter()
.find(|c| c.column_name == "status")
.expect("status column");
assert_eq!(
status_col.default,
Some(Value::Text("active".to_string())),
"DEFAULT 'active' should round-trip"
);
let score_col = users
.columns
.iter()
.find(|c| c.column_name == "score")
.expect("score column");
assert_eq!(
score_col.default,
Some(Value::Integer(0)),
"DEFAULT 0 should round-trip"
);
process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
let users = loaded.get_table("users".to_string()).unwrap();
assert_eq!(
users.get_value("status", 1),
Some(Value::Text("active".to_string()))
);
assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
cleanup(&path);
}
#[test]
fn drop_table_freelist_persists_pages_for_reuse() {
let path = tmp_path("freelist_reuse");
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE users;", &mut db).expect("drop users");
let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_after_drop, pages_two_tables,
"page_count should not shrink on drop — the freed pages persist on the freelist"
);
let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
assert!(
head_after_drop != 0,
"freelist_head must be non-zero after drop"
);
process_command(
"CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
&mut db,
)
.expect("create accounts");
process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
assert!(
pages_after_create <= pages_two_tables + 2,
"creating a similar-sized table after a drop should mostly draw from the \
freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
);
cleanup(&path);
}
#[test]
fn drop_then_vacuum_shrinks_file() {
let path = tmp_path("vacuum_shrinks");
let mut db = seed_db();
db.source_path = Some(path.clone());
for i in 0..20 {
process_command(
&format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
&mut db,
)
.unwrap();
}
save_database(&mut db, &path).expect("save");
process_command("DROP TABLE users;", &mut db).expect("drop");
let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
let head_before = db.pager.as_ref().unwrap().header().freelist_head;
assert!(head_before != 0, "drop should populate the freelist");
process_command("VACUUM;", &mut db).expect("vacuum");
let size_after = std::fs::metadata(&path).unwrap().len();
let pages_after = db.pager.as_ref().unwrap().header().page_count;
let head_after = db.pager.as_ref().unwrap().header().freelist_head;
assert!(
pages_after < pages_before_vacuum,
"VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
);
assert_eq!(head_after, 0, "VACUUM must clear the freelist");
assert!(
size_after < size_before_vacuum,
"VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
);
cleanup(&path);
}
#[test]
fn vacuum_round_trips_data() {
let path = tmp_path("vacuum_round_trip");
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
process_command("VACUUM;", &mut db).expect("vacuum");
drop(db);
let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
assert!(loaded.contains_table("users".to_string()));
assert!(loaded.contains_table("notes".to_string()));
let users = loaded.get_table("users".to_string()).unwrap();
assert_eq!(users.rowids().len(), 2);
cleanup(&path);
}
#[test]
fn freelist_format_version_promotion() {
use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
let path = tmp_path("v6_promotion");
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
let v_after_save = db.pager.as_ref().unwrap().header().format_version;
assert_eq!(
v_after_save, FORMAT_VERSION_BASELINE,
"fresh DB without drops should stay at the baseline version"
);
process_command("DROP TABLE users;", &mut db).expect("drop");
let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
assert_eq!(
v_after_drop, FORMAT_VERSION_V6,
"first save with a non-empty freelist must promote to V6"
);
process_command("VACUUM;", &mut db).expect("vacuum");
let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
assert_eq!(
v_after_vacuum, FORMAT_VERSION_V6,
"VACUUM must not downgrade — V6 is a strict superset"
);
cleanup(&path);
}
#[test]
fn freelist_round_trip_through_reopen() {
let path = tmp_path("freelist_reopen");
let pages_two_tables;
{
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE users;", &mut db).expect("drop");
let head = db.pager.as_ref().unwrap().header().freelist_head;
assert!(head != 0, "drop must populate the freelist");
}
let mut db = open_database(&path, "t".to_string()).expect("reopen");
assert!(
db.pager.as_ref().unwrap().header().freelist_head != 0,
"freelist_head must survive close/reopen"
);
process_command(
"CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
&mut db,
)
.expect("create accounts");
process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
assert!(
pages_after_create <= pages_two_tables + 2,
"post-reopen create should reuse freelist (got {pages_after_create} > \
{pages_two_tables} + 2 — file extended instead of reusing)"
);
cleanup(&path);
}
#[test]
fn vacuum_inside_transaction_is_rejected() {
let path = tmp_path("vacuum_txn");
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
process_command("BEGIN;", &mut db).expect("begin");
let err = process_command("VACUUM;", &mut db).unwrap_err();
assert!(
format!("{err}").contains("VACUUM cannot run inside a transaction"),
"expected in-transaction rejection, got: {err}"
);
process_command("ROLLBACK;", &mut db).unwrap();
cleanup(&path);
}
#[test]
fn vacuum_on_in_memory_database_is_noop() {
let mut db = Database::new("mem".to_string());
process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
assert!(
out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
"expected no-op message for in-memory VACUUM, got: {out}"
);
}
#[test]
fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
let path = tmp_path("diff_after_drop");
let mut db = Database::new("t".to_string());
db.source_path = Some(path.clone());
process_command(
"CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
&mut db,
)
.unwrap();
process_command(
"CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
&mut db,
)
.unwrap();
process_command(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
&mut db,
)
.unwrap();
for i in 0..5 {
process_command(
&format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
&mut db,
)
.unwrap();
process_command(
&format!("INSERT INTO notes (body) VALUES ('n{i}');"),
&mut db,
)
.unwrap();
process_command(
&format!("INSERT INTO users (name) VALUES ('u{i}');"),
&mut db,
)
.unwrap();
}
save_database(&mut db, &path).expect("baseline save");
let pager = db.pager.as_ref().unwrap();
let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
.unwrap()
.get(&("table".to_string(), "accounts".to_string()))
.copied()
.unwrap();
let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
.unwrap()
.get(&("table".to_string(), "users".to_string()))
.copied()
.unwrap();
let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
process_command("DROP TABLE notes;", &mut db).expect("drop notes");
let pager = db.pager.as_ref().unwrap();
let acc_after = pager.read_page(acc_root).unwrap();
let users_after = pager.read_page(users_root).unwrap();
assert_eq!(
&acc_after[..],
&acc_bytes_before[..],
"accounts root page must not be rewritten when an unrelated table is dropped"
);
assert_eq!(
&users_after[..],
&users_bytes_before[..],
"users root page must not be rewritten when an unrelated table is dropped"
);
cleanup(&path);
}
fn auto_vacuum_setup(path: &std::path::Path) -> Database {
let mut db = Database::new("av".to_string());
db.source_path = Some(path.to_path_buf());
process_command(
"CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
&mut db,
)
.unwrap();
process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
process_command(
"CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
&mut db,
)
.unwrap();
process_command("BEGIN;", &mut db).unwrap();
for i in 0..5000 {
process_command(
&format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
&mut db,
)
.unwrap();
}
process_command("COMMIT;", &mut db).unwrap();
db
}
#[test]
fn auto_vacuum_default_threshold_triggers_on_drop_table() {
let path = tmp_path("av_default_drop_table");
let mut db = auto_vacuum_setup(&path);
assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
if let Some(p) = db.pager.as_mut() {
let _ = p.checkpoint();
}
let pages_before = db.pager.as_ref().unwrap().header().page_count;
let size_before = std::fs::metadata(&path).unwrap().len();
assert!(
pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
"setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
pages so the floor doesn't suppress the trigger; got {pages_before}"
);
process_command("DROP TABLE bloat;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
let head_after = db.pager.as_ref().unwrap().header().freelist_head;
if let Some(p) = db.pager.as_mut() {
let _ = p.checkpoint();
}
let size_after = std::fs::metadata(&path).unwrap().len();
assert!(
pages_after < pages_before,
"auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
);
assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
assert!(
size_after < size_before,
"auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
);
cleanup(&path);
}
#[test]
fn auto_vacuum_disabled_keeps_file_at_hwm() {
let path = tmp_path("av_disabled");
let mut db = auto_vacuum_setup(&path);
db.set_auto_vacuum_threshold(None).expect("disable");
assert_eq!(db.auto_vacuum_threshold(), None);
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE bloat;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
let head_after = db.pager.as_ref().unwrap().header().freelist_head;
assert_eq!(
pages_after, pages_before,
"with auto-VACUUM disabled, drop must keep page_count at the HWM"
);
assert!(
head_after != 0,
"drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
);
cleanup(&path);
}
#[test]
fn auto_vacuum_triggers_on_drop_index() {
let path = tmp_path("av_drop_index");
let mut db = auto_vacuum_setup(&path);
db.set_auto_vacuum_threshold(None).expect("disable");
process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
assert!(
head_after_bloat_drop != 0,
"bloat drop must populate the freelist (else later index drop won't trip the threshold)"
);
process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
let head_after = db.pager.as_ref().unwrap().header().freelist_head;
assert!(
pages_after < pages_after_bloat_drop,
"DROP INDEX should fire auto-VACUUM and reduce page_count: \
was {pages_after_bloat_drop}, now {pages_after}"
);
assert_eq!(
head_after, 0,
"auto-VACUUM after DROP INDEX must clear the freelist"
);
cleanup(&path);
}
#[test]
fn auto_vacuum_triggers_on_alter_drop_column() {
let path = tmp_path("av_alter_drop_col");
let mut db = auto_vacuum_setup(&path);
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
assert!(
pages_after < pages_before,
"ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
was {pages_before}, now {pages_after}"
);
assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
cleanup(&path);
}
#[test]
fn auto_vacuum_skips_below_threshold() {
let path = tmp_path("av_below_threshold");
let mut db = auto_vacuum_setup(&path);
db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE bloat;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_after, pages_before,
"freelist ratio after a single drop is far below 0.99 — \
page_count must stay at the HWM"
);
assert!(
db.pager.as_ref().unwrap().header().freelist_head != 0,
"drop must still populate the freelist"
);
cleanup(&path);
}
#[test]
fn auto_vacuum_skips_inside_transaction() {
let path = tmp_path("av_in_txn");
let mut db = auto_vacuum_setup(&path);
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("BEGIN;", &mut db).expect("begin");
process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
let pages_mid = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_mid, pages_before,
"auto-VACUUM must not fire mid-transaction"
);
process_command("ROLLBACK;", &mut db).expect("rollback");
cleanup(&path);
}
#[test]
fn auto_vacuum_skips_under_min_pages_floor() {
let path = tmp_path("av_under_floor");
let mut db = seed_db(); db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
let pages_before = db.pager.as_ref().unwrap().header().page_count;
assert!(
pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
"test setup is too large: floor would not apply (got {pages_before} pages, \
floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
);
process_command("DROP TABLE users;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_after, pages_before,
"below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
);
assert!(
db.pager.as_ref().unwrap().header().freelist_head != 0,
"drop must still populate the freelist normally"
);
cleanup(&path);
}
#[test]
fn set_auto_vacuum_threshold_rejects_out_of_range() {
let mut db = Database::new("t".to_string());
for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
assert!(
format!("{err}").contains("auto_vacuum_threshold"),
"expected a typed range error for {bad}, got: {err}"
);
}
assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
db.set_auto_vacuum_threshold(None).unwrap();
assert_eq!(db.auto_vacuum_threshold(), None);
}
#[test]
fn pragma_auto_vacuum_set_and_read_via_sql() {
let mut db = Database::new("t".to_string());
let resp = process_command("PRAGMA auto_vacuum = 0.5;", &mut db).expect("set");
assert!(
resp.contains("PRAGMA"),
"set form should produce a PRAGMA status, got: {resp}"
);
assert_eq!(db.auto_vacuum_threshold(), Some(0.5));
let resp = process_command("PRAGMA auto_vacuum;", &mut db).expect("read");
assert!(resp.contains("1 row"), "expected a 1-row read, got: {resp}");
}
#[test]
fn pragma_auto_vacuum_off_disables_trigger() {
for raw in ["OFF", "off", "NONE", "none", "'OFF'", "'NONE'"] {
let mut db = Database::new("t".to_string());
assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
let stmt = format!("PRAGMA auto_vacuum = {raw};");
process_command(&stmt, &mut db)
.unwrap_or_else(|e| panic!("`{stmt}` should disable: {e}"));
assert_eq!(
db.auto_vacuum_threshold(),
None,
"`{stmt}` should clear the threshold"
);
}
}
#[test]
fn pragma_auto_vacuum_rejects_out_of_range_via_sql() {
let mut db = Database::new("t".to_string());
for bad in ["-0.01", "1.01", "1.5"] {
let stmt = format!("PRAGMA auto_vacuum = {bad};");
let err = process_command(&stmt, &mut db).unwrap_err();
assert!(
format!("{err}").contains("auto_vacuum_threshold"),
"expected range error for `{stmt}`, got: {err}"
);
}
assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
}
#[test]
fn pragma_auto_vacuum_rejects_unknown_strings_via_sql() {
let mut db = Database::new("t".to_string());
let err = process_command("PRAGMA auto_vacuum = WAL;", &mut db).unwrap_err();
assert!(
format!("{err}").contains("OFF/NONE"),
"expected OFF/NONE-style error, got: {err}"
);
assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
}
#[test]
fn pragma_unknown_returns_not_implemented() {
let mut db = Database::new("t".to_string());
let err = process_command("PRAGMA synchronous = NORMAL;", &mut db).unwrap_err();
assert!(
matches!(err, SQLRiteError::NotImplemented(_)),
"unknown pragma must surface NotImplemented, got: {err:?}"
);
}
#[test]
fn pragma_auto_vacuum_drives_real_trigger() {
{
let path = tmp_path("av_pragma_off");
let mut db = auto_vacuum_setup(&path);
process_command("PRAGMA auto_vacuum = OFF;", &mut db).expect("disable via PRAGMA");
assert_eq!(db.auto_vacuum_threshold(), None);
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE bloat;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_after, pages_before,
"PRAGMA-driven OFF must keep page_count at the HWM"
);
cleanup(&path);
}
{
let path = tmp_path("av_pragma_high");
let mut db = auto_vacuum_setup(&path);
process_command("PRAGMA auto_vacuum = 0.99;", &mut db).expect("set high");
assert_eq!(db.auto_vacuum_threshold(), Some(0.99));
let pages_before = db.pager.as_ref().unwrap().header().page_count;
process_command("DROP TABLE bloat;", &mut db).expect("drop");
let pages_after = db.pager.as_ref().unwrap().header().page_count;
assert_eq!(
pages_after, pages_before,
"high PRAGMA threshold must suppress the trigger"
);
cleanup(&path);
}
{
let path = tmp_path("av_pragma_rearm");
let mut db = auto_vacuum_setup(&path);
process_command("PRAGMA auto_vacuum = OFF;", &mut db).unwrap();
process_command("DROP TABLE bloat;", &mut db).unwrap();
let pages_after_off_drop = db.pager.as_ref().unwrap().header().page_count;
assert!(db.pager.as_ref().unwrap().header().freelist_head != 0);
process_command("PRAGMA auto_vacuum = 0.25;", &mut db).expect("re-arm");
process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).unwrap();
process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
let pages_after_rearm = db.pager.as_ref().unwrap().header().page_count;
assert!(
pages_after_rearm < pages_after_off_drop,
"re-armed PRAGMA must let auto-VACUUM fire: was {pages_after_off_drop}, \
now {pages_after_rearm}"
);
assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
cleanup(&path);
}
}
#[test]
fn vacuum_modifiers_are_rejected() {
let path = tmp_path("vacuum_modifiers");
let mut db = seed_db();
db.source_path = Some(path.clone());
save_database(&mut db, &path).expect("save");
for stmt in ["VACUUM FULL;", "VACUUM users;"] {
let err = process_command(stmt, &mut db).unwrap_err();
assert!(
format!("{err}").contains("VACUUM modifiers"),
"expected modifier rejection for `{stmt}`, got: {err}"
);
}
cleanup(&path);
}
}