#![no_std]
#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
extern crate alloc;
pub mod bloom;
pub mod halfvec;
pub mod persistent;
pub mod persistent_btree;
pub mod quantize;
pub mod row_locator;
pub mod segment;
pub use self::bloom::{BloomError, BloomFilter};
pub use self::row_locator::{RowLocator, RowLocatorError};
pub use self::segment::{
BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
wrap_v2_envelope_with_brin,
};
use alloc::collections::{BTreeMap, BTreeSet};
use alloc::format;
use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt;
use self::persistent::PersistentVec;
use self::persistent_btree::PersistentBTreeMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VecEncoding {
#[default]
F32,
Sq8,
F16,
}
impl fmt::Display for VecEncoding {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::F32 => f.write_str("F32"),
Self::Sq8 => f.write_str("SQ8"),
Self::F16 => f.write_str("HALF"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataType {
SmallInt,
Int, BigInt, Float, Text,
Varchar(u32),
Char(u32),
Bool,
Vector {
dim: u32,
encoding: VecEncoding,
},
Numeric {
precision: u8,
scale: u8,
},
Date,
Timestamp,
Interval,
Json,
}
impl fmt::Display for DataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SmallInt => f.write_str("SMALLINT"),
Self::Int => f.write_str("INT"),
Self::BigInt => f.write_str("BIGINT"),
Self::Float => f.write_str("FLOAT"),
Self::Text => f.write_str("TEXT"),
Self::Varchar(n) => write!(f, "VARCHAR({n})"),
Self::Char(n) => write!(f, "CHAR({n})"),
Self::Bool => f.write_str("BOOL"),
Self::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => write!(f, "VECTOR({dim})"),
VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
},
Self::Numeric { precision, scale } => {
if *scale == 0 {
write!(f, "NUMERIC({precision})")
} else {
write!(f, "NUMERIC({precision}, {scale})")
}
}
Self::Date => f.write_str("DATE"),
Self::Timestamp => f.write_str("TIMESTAMP"),
Self::Interval => f.write_str("INTERVAL"),
Self::Json => f.write_str("JSON"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum Value {
SmallInt(i16),
Int(i32),
BigInt(i64),
Float(f64),
Text(String),
Bool(bool),
Vector(Vec<f32>),
Sq8Vector(crate::quantize::Sq8Vector),
HalfVector(crate::halfvec::HalfVector),
Numeric {
scaled: i128,
scale: u8,
},
Date(i32),
Timestamp(i64),
Interval {
months: i32,
micros: i64,
},
Json(String),
Null,
}
impl Value {
pub fn data_type(&self) -> Option<DataType> {
match self {
Self::SmallInt(_) => Some(DataType::SmallInt),
Self::Int(_) => Some(DataType::Int),
Self::BigInt(_) => Some(DataType::BigInt),
Self::Float(_) => Some(DataType::Float),
Self::Text(_) => Some(DataType::Text),
Self::Bool(_) => Some(DataType::Bool),
Self::Vector(v) => Some(DataType::Vector {
dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F32,
}),
Self::Sq8Vector(q) => Some(DataType::Vector {
dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::Sq8,
}),
Self::HalfVector(h) => Some(DataType::Vector {
dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F16,
}),
Self::Numeric { scale, .. } => Some(DataType::Numeric {
precision: 0,
scale: *scale,
}),
Self::Date(_) => Some(DataType::Date),
Self::Timestamp(_) => Some(DataType::Timestamp),
Self::Interval { .. } => Some(DataType::Interval),
Self::Json(_) => Some(DataType::Json),
Self::Null => None,
}
}
pub const fn is_null(&self) -> bool {
matches!(self, Self::Null)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Row {
pub values: Vec<Value>,
}
impl Row {
pub const fn new(values: Vec<Value>) -> Self {
Self { values }
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub ty: DataType,
pub nullable: bool,
pub default: Option<Value>,
pub auto_increment: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnSchema>,
pub hot_tier_bytes: Option<u64>,
pub foreign_keys: Vec<ForeignKeyConstraint>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForeignKeyConstraint {
pub name: Option<String>,
pub local_columns: Vec<usize>,
pub parent_table: String,
pub parent_columns: Vec<usize>,
pub on_delete: FkAction,
pub on_update: FkAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FkAction {
Restrict,
Cascade,
SetNull,
SetDefault,
NoAction,
}
impl FkAction {
pub const fn tag(self) -> u8 {
match self {
Self::Restrict => 0,
Self::Cascade => 1,
Self::SetNull => 2,
Self::SetDefault => 3,
Self::NoAction => 4,
}
}
pub const fn from_tag(b: u8) -> Option<Self> {
Some(match b {
0 => Self::Restrict,
1 => Self::Cascade,
2 => Self::SetNull,
3 => Self::SetDefault,
4 => Self::NoAction,
_ => return None,
})
}
}
impl TableSchema {
pub fn column_position(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c.name == name)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum IndexKey {
Int(i64),
Text(String),
Bool(bool),
}
impl IndexKey {
pub fn from_value(v: &Value) -> Option<Self> {
match v {
Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
Value::Int(n) => Some(Self::Int(i64::from(*n))),
Value::BigInt(n) => Some(Self::Int(*n)),
Value::Text(s) => Some(Self::Text(s.clone())),
Value::Bool(b) => Some(Self::Bool(*b)),
Value::Date(d) => Some(Self::Int(i64::from(*d))),
Value::Timestamp(t) => Some(Self::Int(*t)),
Value::Null
| Value::Float(_)
| Value::Vector(_)
| Value::Sq8Vector(_)
| Value::HalfVector(_)
| Value::Numeric { .. }
| Value::Interval { .. }
| Value::Json(_) => None,
}
}
}
#[derive(Debug, Clone)]
pub struct Index {
pub name: String,
pub column_position: usize,
pub kind: IndexKind,
pub included_columns: Vec<usize>,
pub partial_predicate: Option<String>,
pub expression: Option<String>,
}
pub const NSW_DEFAULT_M: usize = 16;
#[derive(Debug, Clone)]
pub struct FreezeReport {
pub segment_id: u32,
pub frozen_rows: usize,
pub bytes_freed: u64,
pub segment_bytes: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct FreezeSlice {
pub row_range: core::ops::Range<usize>,
pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
}
#[derive(Debug, Clone)]
pub struct CompactReport {
pub sources: Vec<u32>,
pub merged_segment_id: Option<u32>,
pub merged_segment_bytes: Vec<u8>,
pub merged_rows: usize,
pub deleted_rows_pruned: usize,
pub bytes_reclaimed_estimate: u64,
}
#[derive(Debug, Clone)]
pub enum IndexKind {
BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
Nsw(NswGraph),
Brin {
column_type: DataType,
},
}
#[derive(Debug, Clone)]
pub struct NswGraph {
pub m: usize,
pub m_max_0: usize,
pub entry: Option<usize>,
pub entry_level: u8,
pub levels: PersistentVec<u8>,
pub layers: Vec<PersistentVec<Vec<u32>>>,
}
impl NswGraph {
fn new(m: usize) -> Self {
Self {
m,
m_max_0: m.saturating_mul(2),
entry: None,
entry_level: 0,
levels: PersistentVec::new(),
layers: alloc::vec![PersistentVec::new()],
}
}
pub const fn cap_for_layer(&self, layer: u8) -> usize {
if layer == 0 { self.m_max_0 } else { self.m }
}
}
#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
x ^= x >> 30;
x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
x ^= x >> 27;
x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
x ^= x >> 31;
let mut level: u8 = 0;
while x & 0xF == 0 && level < MAX_LEVEL {
level += 1;
x >>= 4;
}
level
}
impl Index {
fn new_btree(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::BTree(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
}
}
fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::Nsw(NswGraph::new(m)),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
}
}
fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
Self {
name,
column_position,
kind: IndexKind::Brin { column_type },
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
}
}
pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
match &self.kind {
IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
}
}
pub const fn nsw(&self) -> Option<&NswGraph> {
match &self.kind {
IndexKind::Nsw(g) => Some(g),
IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
}
}
pub const fn is_brin(&self) -> bool {
matches!(self.kind, IndexKind::Brin { .. })
}
}
#[derive(Debug, Clone)]
pub struct Table {
schema: TableSchema,
rows: PersistentVec<Row>,
indices: Vec<Index>,
hot_bytes: u64,
cold_row_count: u64,
cold_row_count_stale: bool,
}
impl Table {
pub fn new(schema: TableSchema) -> Self {
Self {
schema,
rows: PersistentVec::new(),
indices: Vec::new(),
hot_bytes: 0,
cold_row_count: 0,
cold_row_count_stale: false,
}
}
#[must_use]
pub const fn hot_bytes(&self) -> u64 {
self.hot_bytes
}
#[must_use]
pub const fn cold_row_count(&self) -> u64 {
self.cold_row_count
}
pub fn set_cold_row_count(&mut self, n: u64) {
self.cold_row_count = n;
self.cold_row_count_stale = false;
}
pub fn mark_cold_row_count_stale(&mut self) {
self.cold_row_count_stale = true;
}
#[must_use]
pub const fn cold_row_count_stale(&self) -> bool {
self.cold_row_count_stale
}
#[must_use]
pub fn count_cold_locators(&self) -> u64 {
let mut best: u64 = 0;
for idx in &self.indices {
if let IndexKind::BTree(map) = &idx.kind {
let n: u64 = map
.iter()
.map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
.sum();
if n > best {
best = n;
}
}
}
best
}
pub const fn schema(&self) -> &TableSchema {
&self.schema
}
pub const fn schema_mut(&mut self) -> &mut TableSchema {
&mut self.schema
}
pub const fn rows(&self) -> &PersistentVec<Row> {
&self.rows
}
pub const fn row_count(&self) -> usize {
self.rows.len()
}
pub fn indices_mut(&mut self) -> &mut [Index] {
&mut self.indices
}
pub fn indices(&self) -> &[Index] {
&self.indices
}
pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
let ty = self.schema.columns.get(col_pos)?.ty;
if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
return None;
}
let mut max: Option<i64> = None;
for row in &self.rows {
match row.values.get(col_pos) {
Some(Value::SmallInt(n)) => {
let v = i64::from(*n);
max = Some(max.map_or(v, |m| m.max(v)));
}
Some(Value::Int(n)) => {
let v = i64::from(*n);
max = Some(max.map_or(v, |m| m.max(v)));
}
Some(Value::BigInt(n)) => {
max = Some(max.map_or(*n, |m| m.max(*n)));
}
_ => {}
}
}
Some(max.map_or(1, |m| m + 1))
}
pub fn index_on(&self, column_position: usize) -> Option<&Index> {
self.indices
.iter()
.find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
.or_else(|| {
self.indices
.iter()
.find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
})
}
pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
if row.len() != self.schema.columns.len() {
return Err(StorageError::ArityMismatch {
expected: self.schema.columns.len(),
actual: row.len(),
});
}
for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
if val.is_null() {
if !col.nullable {
return Err(StorageError::NullInNotNull {
column: col.name.clone(),
});
}
continue;
}
let actual = val.data_type().expect("non-null");
let compatible = actual == col.ty
|| matches!(
(actual, col.ty),
(
DataType::Text,
DataType::Varchar(_) | DataType::Char(_) | DataType::Json
) | (DataType::Json, DataType::Text)
)
|| matches!(
(actual, col.ty),
(
DataType::Numeric { scale: a, .. },
DataType::Numeric { scale: b, .. },
) if a == b
);
if !compatible {
return Err(StorageError::TypeMismatch {
column: col.name.clone(),
expected: col.ty,
actual,
position: i,
});
}
}
let new_row_idx = self.rows.len();
for idx in &mut self.indices {
if let IndexKind::BTree(map) = &mut idx.kind
&& let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
{
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(new_row_idx));
map.insert_mut(key, entries);
}
}
self.hot_bytes = self
.hot_bytes
.saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
self.rows.push_mut(row);
let new_row_idx = self.rows.len() - 1;
let nsw_targets: Vec<usize> = self
.indices
.iter()
.enumerate()
.filter_map(|(i, idx)| {
if matches!(idx.kind, IndexKind::Nsw(_)) {
Some(i)
} else {
None
}
})
.collect();
for idx_pos in nsw_targets {
nsw_insert_at(self, idx_pos, new_row_idx);
}
Ok(())
}
pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let mut idx = Index::new_btree(name, column_position);
if let IndexKind::BTree(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(key, entries);
}
}
}
self.indices.push(idx);
Ok(())
}
pub fn add_nsw_index(
&mut self,
name: String,
column_name: &str,
m: usize,
) -> Result<(), StorageError> {
self.add_nsw_index_inner(name, column_name, m, None)
}
pub fn rebuild_nsw_index(
&mut self,
name: &str,
new_encoding: Option<VecEncoding>,
) -> Result<(), StorageError> {
let idx_pos = self
.indices
.iter()
.position(|i| i.name == name)
.ok_or_else(|| StorageError::IndexNotFound {
name: String::from(name),
})?;
let col_pos = self.indices[idx_pos].column_position;
let m = match &self.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.m,
IndexKind::BTree(_) | IndexKind::Brin { .. } => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
)));
}
};
let col_name = self.schema.columns[col_pos].name.clone();
if let Some(target) = new_encoding {
let current = match self.schema.columns[col_pos].ty {
DataType::Vector { encoding, .. } => encoding,
ref other => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
)));
}
};
if target != current {
let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
unreachable!("checked above")
};
let n = self.rows.len();
for i in 0..n {
let row = self
.rows
.get_mut(i)
.expect("row index in bounds (we iterated up to len())");
let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
let recoded = recode_vector_cell(cell, target)?;
row.values[col_pos] = recoded;
}
self.schema.columns[col_pos].ty = DataType::Vector {
dim,
encoding: target,
};
}
}
self.indices.remove(idx_pos);
self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
Ok(())
}
pub fn restore_nsw_index(
&mut self,
name: String,
column_name: &str,
graph: NswGraph,
) -> Result<(), StorageError> {
self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
}
pub fn restore_btree_index(
&mut self,
name: String,
column_name: &str,
map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
self.indices.push(Index {
name,
column_position,
kind: IndexKind::BTree(map),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
});
Ok(())
}
pub fn restore_brin_index(
&mut self,
name: String,
column_name: &str,
column_type: DataType,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
self.indices.push(Index::new_brin(name, column_position, column_type));
Ok(())
}
pub fn add_brin_index(
&mut self,
name: String,
column_name: &str,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let column_type = self.schema.columns[column_position].ty;
self.indices.push(Index::new_brin(name, column_position, column_type));
Ok(())
}
pub fn register_cold_locators<I>(
&mut self,
index_name: &str,
locators: I,
) -> Result<usize, StorageError>
where
I: IntoIterator<Item = (IndexKey, RowLocator)>,
{
let idx = self
.indices
.iter_mut()
.find(|i| i.name == index_name)
.ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
let map = match &mut idx.kind {
IndexKind::BTree(map) => map,
IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
return Err(StorageError::Corrupt(format!(
"index {index_name:?} is not BTree; cold locators apply only to BTree indices"
)));
}
};
let mut count = 0usize;
for (key, locator) in locators {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(locator);
map.insert_mut(key, entries);
count += 1;
}
Ok(count)
}
pub fn remove_cold_locators_for_key(
&mut self,
index_name: &str,
key: &IndexKey,
) -> Result<usize, StorageError> {
let idx = self
.indices
.iter_mut()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"remove_cold_locators_for_key: index {index_name:?} not found"
))
})?;
let map = match &mut idx.kind {
IndexKind::BTree(map) => map,
IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
return Err(StorageError::Corrupt(format!(
"remove_cold_locators_for_key: index {index_name:?} is not BTree; \
cold locators apply only to BTree indices"
)));
}
};
let Some(entries) = map.get(key) else {
return Ok(0);
};
let mut kept: Vec<RowLocator> =
entries.iter().copied().filter(RowLocator::is_hot).collect();
let removed = entries.len() - kept.len();
if removed == 0 {
return Ok(0);
}
kept.shrink_to_fit();
map.insert_mut(key.clone(), kept);
Ok(removed)
}
pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
if positions.is_empty() {
return 0;
}
let mut to_remove = alloc::vec![false; self.rows.len()];
let mut removed = 0;
for &p in positions {
if p < to_remove.len() && !to_remove[p] {
to_remove[p] = true;
removed += 1;
}
}
let mut new_rows: PersistentVec<Row> = PersistentVec::new();
let mut removed_bytes: u64 = 0;
for (i, row) in self.rows.iter().enumerate() {
if to_remove[i] {
removed_bytes =
removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
} else {
new_rows.push_mut(row.clone());
}
}
self.rows = new_rows;
self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
self.rebuild_indices();
removed
}
pub fn update_row(
&mut self,
position: usize,
new_values: Vec<Value>,
) -> Result<(), StorageError> {
if position >= self.rows.len() {
return Err(StorageError::Corrupt(alloc::format!(
"update_row: position {position} out of bounds (rows={})",
self.rows.len()
)));
}
if new_values.len() != self.schema.columns.len() {
return Err(StorageError::ArityMismatch {
expected: self.schema.columns.len(),
actual: new_values.len(),
});
}
for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
if val.is_null() {
if !col.nullable {
return Err(StorageError::NullInNotNull {
column: col.name.clone(),
});
}
continue;
}
let actual = val.data_type().expect("non-null");
let compatible = actual == col.ty
|| matches!(
(actual, col.ty),
(
DataType::Text,
DataType::Varchar(_) | DataType::Char(_) | DataType::Json
) | (DataType::Json, DataType::Text)
)
|| matches!(
(actual, col.ty),
(
DataType::Numeric { scale: a, .. },
DataType::Numeric { scale: b, .. },
) if a == b
);
if !compatible {
return Err(StorageError::TypeMismatch {
column: col.name.clone(),
expected: col.ty,
actual,
position: i,
});
}
}
let old_row = self
.rows
.get(position)
.expect("position bounds-checked above");
let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
let new_row = Row::new(new_values);
let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
self.rows = self
.rows
.set(position, new_row)
.expect("position bounds-checked above");
self.hot_bytes = self
.hot_bytes
.saturating_sub(old_bytes)
.saturating_add(new_bytes);
self.rebuild_indices();
Ok(())
}
fn rebuild_indices(&mut self) {
let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
.indices
.iter()
.filter_map(|idx| match &idx.kind {
IndexKind::BTree(map) => {
let cold: Vec<(IndexKey, RowLocator)> = map
.iter()
.flat_map(|(k, locs)| {
locs.iter()
.filter(|l| l.is_cold())
.copied()
.map(move |l| (k.clone(), l))
})
.collect();
if cold.is_empty() {
None
} else {
Some((idx.name.clone(), cold))
}
}
IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
})
.collect();
#[derive(Clone)]
enum RebuildKind {
BTree,
Nsw(usize),
Brin(DataType),
}
let descriptors: Vec<(String, usize, RebuildKind)> = self
.indices
.iter()
.map(|idx| {
let kind = match &idx.kind {
IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
IndexKind::BTree(_) => RebuildKind::BTree,
};
(idx.name.clone(), idx.column_position, kind)
})
.collect();
self.indices.clear();
for (name, column_position, rebuild_kind) in descriptors {
match rebuild_kind {
RebuildKind::Nsw(m) => {
let idx = Index::new_nsw(name, column_position, m);
self.indices.push(idx);
let idx_pos = self.indices.len() - 1;
let row_indices: Vec<usize> = (0..self.rows.len()).collect();
for row_idx in row_indices {
nsw_insert_at(self, idx_pos, row_idx);
}
}
RebuildKind::Brin(column_type) => {
self.indices.push(Index::new_brin(name, column_position, column_type));
}
RebuildKind::BTree => {
let mut idx = Index::new_btree(name, column_position);
if let IndexKind::BTree(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(key, entries);
}
}
}
self.indices.push(idx);
}
}
}
for (idx_name, locators) in preserved_cold {
let _ = self.register_cold_locators(&idx_name, locators);
}
}
fn add_nsw_index_inner(
&mut self,
name: String,
column_name: &str,
m: usize,
restore: Option<NswGraph>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
if !matches!(
self.schema.columns[column_position].ty,
DataType::Vector { .. }
) {
return Err(StorageError::TypeMismatch {
column: column_name.into(),
expected: DataType::Vector {
dim: 0,
encoding: VecEncoding::F32,
},
actual: self.schema.columns[column_position].ty,
position: column_position,
});
}
if let Some(graph) = restore {
self.indices.push(Index {
name,
column_position,
kind: IndexKind::Nsw(graph),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
});
return Ok(());
}
let idx = Index::new_nsw(name, column_position, m);
self.indices.push(idx);
let idx_pos = self.indices.len() - 1;
let row_indices: Vec<usize> = (0..self.rows.len()).collect();
for row_idx in row_indices {
nsw_insert_at(self, idx_pos, row_idx);
}
Ok(())
}
}
fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
if matches!(cell, Value::Null) {
return Ok(cell);
}
let as_f32: Vec<f32> = match &cell {
Value::Vector(v) => v.clone(),
Value::Sq8Vector(q) => quantize::dequantize(q),
Value::HalfVector(h) => h.to_f32_vec(),
other => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
other.data_type()
)));
}
};
Ok(match target {
VecEncoding::F32 => Value::Vector(as_f32),
VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
})
}
fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
let col_pos = table.indices[idx_pos].column_position;
let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
Value::Vector(v) => Some(v.len()),
Value::Sq8Vector(q) => Some(q.bytes.len()),
Value::HalfVector(h) => Some(h.dim()),
_ => None,
};
let Some(dim) = cell_dim else {
ensure_node_slot(table, idx_pos, new_row_idx, 0);
return;
};
if dim == 0 {
ensure_node_slot(table, idx_pos, new_row_idx, 0);
return;
}
let level = nsw_assign_level(new_row_idx);
ensure_node_slot(table, idx_pos, new_row_idx, level);
let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
IndexKind::BTree(_) | IndexKind::Brin { .. } => {
unreachable!("nsw_insert_at on a non-NSW index")
}
};
if entry.is_none() {
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
g.entry = Some(new_row_idx);
g.entry_level = level;
*g.levels
.get_mut(new_row_idx)
.expect("levels slot padded by ensure_node_slot") = level;
}
return;
}
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
*g.levels
.get_mut(new_row_idx)
.expect("levels slot padded by ensure_node_slot") = level;
}
let query = match &table.rows[new_row_idx].values[col_pos] {
Value::Vector(v) => v.clone(),
Value::Sq8Vector(q) => quantize::dequantize(q),
Value::HalfVector(h) => h.to_f32_vec(),
_ => return,
};
let mut current = entry.expect("entry was Some above");
let mut current_d = vec_l2_sq(table, col_pos, current, &query);
if entry_level > level {
for layer in (level + 1..=entry_level).rev() {
(current, current_d) =
greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
}
}
let top = level.min(entry_level);
let ef = (m * 2).max(8);
for layer in (0..=top).rev() {
let cap = if layer == 0 { m * 2 } else { m };
let mut candidates = layer_beam_search(
table,
idx_pos,
layer,
current,
current_d,
&query,
ef,
NswMetric::L2,
);
candidates.retain(|&(_, n)| n != new_row_idx);
if let Some(&(d, n)) = candidates.first() {
current = n;
current_d = d;
}
let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
}
if level > entry_level
&& let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
{
g.entry = Some(new_row_idx);
g.entry_level = level;
}
}
fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
unreachable!("ensure_node_slot on a BTree index");
};
while g.layers.len() <= level as usize {
g.layers.push(PersistentVec::new());
}
while g.levels.len() <= new_row_idx {
g.levels.push_mut(0);
}
for layer_vec in &mut g.layers {
while layer_vec.len() <= new_row_idx {
layer_vec.push_mut(Vec::new());
}
}
}
fn greedy_layer_walk(
table: &Table,
idx_pos: usize,
layer: u8,
mut current: usize,
mut current_d: f32,
query: &[f32],
) -> (usize, f32) {
let g = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
};
let col_pos = table.indices[idx_pos].column_position;
loop {
let neighbours: &[u32] = g
.layers
.get(layer as usize)
.and_then(|layer_v| layer_v.get(current))
.map_or(&[][..], Vec::as_slice);
let mut best = current;
let mut best_d = current_d;
for &n in neighbours {
let n = n as usize;
let d = vec_l2_sq(table, col_pos, n, query);
if d < best_d {
best = n;
best_d = d;
}
}
if best == current {
return (current, current_d);
}
current = best;
current_d = best_d;
}
}
#[allow(clippy::too_many_arguments)] fn layer_beam_search(
table: &Table,
idx_pos: usize,
layer: u8,
entry_node: usize,
entry_d: f32,
query: &[f32],
ef: usize,
metric: NswMetric,
) -> Vec<(f32, usize)> {
let g = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
};
let col_pos = table.indices[idx_pos].column_position;
let d0 = if matches!(metric, NswMetric::L2) {
entry_d
} else {
cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
};
let row_count = table.rows.len();
let mut visited: Vec<bool> = alloc::vec![false; row_count];
if entry_node < row_count {
visited[entry_node] = true;
}
let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
alloc::collections::BinaryHeap::with_capacity(ef);
let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
alloc::collections::BinaryHeap::with_capacity(ef);
candidates.push(NodeClosest {
dist: d0,
node: entry_node,
});
results.push(NodeFurthest {
dist: d0,
node: entry_node,
});
while let Some(cur) = candidates.pop() {
let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
if cur.dist > worst && results.len() >= ef {
break;
}
let neighbours: &[u32] = g
.layers
.get(layer as usize)
.and_then(|layer_v| layer_v.get(cur.node))
.map_or(&[][..], Vec::as_slice);
for &n in neighbours {
let n = n as usize;
if n >= row_count || visited[n] {
continue;
}
visited[n] = true;
let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
if !dn.is_finite() {
continue;
}
let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
if results.len() < ef || dn < worst {
results.push(NodeFurthest { dist: dn, node: n });
if results.len() > ef {
results.pop();
}
candidates.push(NodeClosest { dist: dn, node: n });
}
}
}
let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
out
}
#[derive(Debug, Clone, Copy)]
struct NodeClosest {
dist: f32,
node: usize,
}
impl PartialEq for NodeClosest {
fn eq(&self, other: &Self) -> bool {
self.dist == other.dist && self.node == other.node
}
}
impl Eq for NodeClosest {}
impl PartialOrd for NodeClosest {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for NodeClosest {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
other
.dist
.partial_cmp(&self.dist)
.unwrap_or(core::cmp::Ordering::Equal)
}
}
#[derive(Debug, Clone, Copy)]
struct NodeFurthest {
dist: f32,
node: usize,
}
impl PartialEq for NodeFurthest {
fn eq(&self, other: &Self) -> bool {
self.dist == other.dist && self.node == other.node
}
}
impl Eq for NodeFurthest {}
impl PartialOrd for NodeFurthest {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for NodeFurthest {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.dist
.partial_cmp(&other.dist)
.unwrap_or(core::cmp::Ordering::Equal)
}
}
fn select_neighbours_heuristic(
candidates: &[(f32, usize)],
m: usize,
table: &Table,
col_pos: usize,
) -> Vec<usize> {
let mut chosen: Vec<usize> = Vec::with_capacity(m);
for &(d_q, e) in candidates {
if chosen.len() >= m {
break;
}
if !matches!(
table.rows.get(e).and_then(|r| r.values.get(col_pos)),
Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
) {
continue;
}
let mut covered = false;
for &r in &chosen {
if cell_l2_sq(table, col_pos, e, r) < d_q {
covered = true;
break;
}
}
if !covered {
chosen.push(e);
}
}
chosen
}
fn connect_at_layer(
table: &mut Table,
idx_pos: usize,
layer: u8,
new_row_idx: usize,
peers: &[usize],
) {
let col_pos = table.indices[idx_pos].column_position;
let cap = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.cap_for_layer(layer),
IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
};
let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
let layer_v = &mut g.layers[layer as usize];
if let Some(slot) = layer_v.get_mut(new_row_idx) {
*slot = peers
.iter()
.map(|&p| u32::try_from(p).expect("row index fits in u32"))
.collect();
}
}
for &peer in peers {
if !matches!(
&table.rows[peer].values[col_pos],
Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
) {
continue;
}
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
let layer_v = &mut g.layers[layer as usize];
if let Some(slot) = layer_v.get_mut(peer)
&& !slot.contains(&new_row_u32)
{
slot.push(new_row_u32);
}
}
let needs_trim = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
};
if needs_trim {
let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.layers[layer as usize][peer]
.iter()
.map(|&n| n as usize)
.collect(),
IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
};
let mut tagged: Vec<(f32, usize)> = current_peers
.iter()
.map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
.collect();
tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
&& let Some(slot) = g.layers[layer as usize].get_mut(peer)
{
*slot = kept
.into_iter()
.map(|p| u32::try_from(p).expect("row index fits in u32"))
.collect();
}
}
}
}
fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
quantize::sq8_l2_distance_sq_asymmetric(q, query)
}
Some(Value::HalfVector(h)) if h.dim() == query.len() => {
halfvec::half_l2_distance_sq_asymmetric(h, query)
}
_ => f32::INFINITY,
}
}
fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
return f32::INFINITY;
};
let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
return f32::INFINITY;
};
match (cell_a, cell_b) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
(Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
quantize::sq8_l2_distance_sq(a, b)
}
(Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
halfvec::half_l2_distance_sq(a, b)
}
_ => f32::INFINITY,
}
}
fn cell_to_query_metric_distance(
table: &Table,
col_pos: usize,
row: usize,
query: &[f32],
metric: NswMetric,
) -> f32 {
match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
},
Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
},
_ => f32::INFINITY,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NswMetric {
L2,
InnerProduct,
Cosine,
}
fn nsw_search(
table: &Table,
idx_pos: usize,
query: &[f32],
k: usize,
ef: usize,
metric: NswMetric,
) -> Vec<(f32, usize)> {
let (entry, entry_level) = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => (g.entry, g.entry_level),
IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
};
let Some(entry) = entry else {
return Vec::new();
};
let col_pos = table.indices[idx_pos].column_position;
let sq8 = matches!(
table.schema.columns.get(col_pos).map(|c| c.ty),
Some(DataType::Vector {
encoding: VecEncoding::Sq8,
..
})
);
let ef = if sq8 {
ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
} else {
ef.max(k)
};
let entry_d = vec_l2_sq(table, col_pos, entry, query);
let mut current = entry;
let mut current_d = entry_d;
for layer in (1..=entry_level).rev() {
(current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
}
let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
if sq8 {
results = sq8_rerank(table, col_pos, &results, query, metric);
}
results.truncate(k);
results
}
fn sq8_rerank(
table: &Table,
col_pos: usize,
candidates: &[(f32, usize)],
query: &[f32],
metric: NswMetric,
) -> Vec<(f32, usize)> {
let mut out: Vec<(f32, usize)> = candidates
.iter()
.filter_map(|&(adc_d, row)| {
let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
let Value::Sq8Vector(q) = cell else {
return Some((adc_d, row));
};
let deq = quantize::dequantize(q);
if deq.len() != query.len() {
return None;
}
Some((metric_distance(metric, &deq, query), row))
})
.collect();
out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
out
}
const SQ8_RERANK_OVER_FETCH: usize = 3;
fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
match metric {
NswMetric::L2 => l2_distance_sq(a, b),
NswMetric::InnerProduct => -inner_product_f32(a, b),
NswMetric::Cosine => {
let (dot, na, nb) = cosine_dot_norms_f32(a, b);
if na == 0.0 || nb == 0.0 {
return f32::INFINITY;
}
let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
1.0 - dot / denom
}
}
}
#[doc(hidden)]
#[inline]
pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { inner_product_neon(a, b) };
}
}
inner_product_scalar(a, b)
}
fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
let mut dot: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
dot += x * y;
}
dot
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
use core::arch::aarch64::{
float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc0 = zero;
let mut acc1 = zero;
let n = a.len();
let mut i = 0usize;
while i + 8 <= n {
let av0 = vld1q_f32(a.as_ptr().add(i));
let bv0 = vld1q_f32(b.as_ptr().add(i));
acc0 = vfmaq_f32(acc0, av0, bv0);
let av1 = vld1q_f32(a.as_ptr().add(i + 4));
let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
acc1 = vfmaq_f32(acc1, av1, bv1);
i += 8;
}
while i + 4 <= n {
let av = vld1q_f32(a.as_ptr().add(i));
let bv = vld1q_f32(b.as_ptr().add(i));
acc0 = vfmaq_f32(acc0, av, bv);
i += 4;
}
vaddvq_f32(vaddq_f32(acc0, acc1))
}
}
#[doc(hidden)]
#[inline]
pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { cosine_dot_norms_neon(a, b) };
}
}
cosine_dot_norms_scalar(a, b)
}
fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
let mut dot: f32 = 0.0;
let mut na: f32 = 0.0;
let mut nb: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
dot += x * y;
na += x * x;
nb += y * y;
}
(dot, na, nb)
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names, clippy::similar_names)]
unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc_dot = zero;
let mut acc_na = zero;
let mut acc_nb = zero;
let n = a.len();
let mut i = 0usize;
while i + 4 <= n {
let av = vld1q_f32(a.as_ptr().add(i));
let bv = vld1q_f32(b.as_ptr().add(i));
acc_dot = vfmaq_f32(acc_dot, av, bv);
acc_na = vfmaq_f32(acc_na, av, av);
acc_nb = vfmaq_f32(acc_nb, bv, bv);
i += 4;
}
(vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
}
}
fn sqrt_newton_f32(x: f32) -> f32 {
if x <= 0.0 {
return 0.0;
}
let mut g = x;
for _ in 0..10 {
g = 0.5 * (g + x / g);
}
g
}
#[inline]
fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { l2_distance_sq_neon(a, b) };
}
}
l2_distance_sq_scalar(a, b)
}
fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
let mut sum: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
let d = *x - *y;
sum += d * d;
}
sum
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
use core::arch::aarch64::{
float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc0 = zero;
let mut acc1 = zero;
let n = a.len();
let mut i = 0usize;
while i + 8 <= n {
let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
acc0 = vfmaq_f32(acc0, d0, d0);
let d1 = vsubq_f32(
vld1q_f32(a.as_ptr().add(i + 4)),
vld1q_f32(b.as_ptr().add(i + 4)),
);
acc1 = vfmaq_f32(acc1, d1, d1);
i += 8;
}
while i + 4 <= n {
let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
acc0 = vfmaq_f32(acc0, d, d);
i += 4;
}
vaddvq_f32(vaddq_f32(acc0, acc1))
}
}
pub fn nsw_query(
table: &Table,
idx_name: &str,
query: &[f32],
k: usize,
metric: NswMetric,
) -> Vec<usize> {
let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
return Vec::new();
};
let ef = (k * 2).max(NSW_DEFAULT_M);
let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
hits.truncate(k);
hits.into_iter().map(|(_, idx)| idx).collect()
}
pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
table
.indices
.iter()
.find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
}
#[derive(Debug, Clone, Default)]
pub struct Catalog {
tables: Vec<Table>,
by_name: BTreeMap<String, usize>,
cold_segments: Vec<Option<Arc<OwnedSegment>>>,
}
impl Catalog {
pub const fn new() -> Self {
Self {
tables: Vec::new(),
by_name: BTreeMap::new(),
cold_segments: Vec::new(),
}
}
pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
if self.by_name.contains_key(&schema.name) {
return Err(StorageError::DuplicateTable {
name: schema.name.clone(),
});
}
let idx = self.tables.len();
let name = schema.name.clone();
self.tables.push(Table::new(schema));
self.by_name.insert(name, idx);
Ok(())
}
pub fn get(&self, name: &str) -> Option<&Table> {
let idx = *self.by_name.get(name)?;
self.tables.get(idx)
}
pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
let idx = *self.by_name.get(name)?;
self.tables.get_mut(idx)
}
pub fn table_count(&self) -> usize {
self.tables.len()
}
pub fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.schema.name.clone()).collect()
}
pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
})?;
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
self.cold_segments.push(Some(Arc::new(seg)));
Ok(id)
}
pub fn load_segment_bytes_at(
&mut self,
target_id: u32,
bytes: Vec<u8>,
) -> Result<(), StorageError> {
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
let idx = target_id as usize;
while self.cold_segments.len() <= idx {
self.cold_segments.push(None);
}
if self.cold_segments[idx].is_some() {
return Err(StorageError::Corrupt(format!(
"load_segment_bytes_at: segment_id {target_id} already occupied"
)));
}
self.cold_segments[idx] = Some(Arc::new(seg));
Ok(())
}
pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
let idx = segment_id as usize;
if idx >= self.cold_segments.len() {
return Err(StorageError::Corrupt(format!(
"tombstone_segment: segment_id {segment_id} out of bounds (len={})",
self.cold_segments.len()
)));
}
self.cold_segments[idx] = None;
Ok(())
}
#[must_use]
pub fn cold_segment_count(&self) -> usize {
self.cold_segments.iter().filter(|s| s.is_some()).count()
}
#[must_use]
pub fn cold_segment_slot_count(&self) -> usize {
self.cold_segments.len()
}
#[must_use]
pub fn cold_segment_ids_global(&self) -> Vec<u32> {
self.cold_segments
.iter()
.enumerate()
.filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
.collect()
}
#[must_use]
pub fn hot_tier_bytes(&self) -> u64 {
self.tables
.iter()
.map(Table::hot_bytes)
.fold(0u64, u64::saturating_add)
}
pub fn freeze_oldest_to_cold(
&mut self,
table_name: &str,
index_name: &str,
max_rows: usize,
) -> Result<FreezeReport, StorageError> {
if max_rows == 0 {
return Err(StorageError::Corrupt(
"freeze_oldest_to_cold: max_rows must be > 0".into(),
));
}
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: table {table_name:?} not found"
))
})?;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
table.rows.len()
)));
}
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
for row_idx in 0..max_rows {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
to_freeze.sort_by_key(|(k, _, _)| *k);
for w in to_freeze.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: duplicate PK {} in freeze batch",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
.into_iter()
.map(|(k, body, _)| (k, body))
.collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
#[must_use]
pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
self.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
}
pub fn resolve_cold_locator(
&self,
table_name: &str,
segment_id: u32,
key: &IndexKey,
) -> Option<Row> {
let t = self.get(table_name)?;
let u64_key = index_key_as_u64(key)?;
let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
let payload = seg.lookup(u64_key)?;
let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
Some(row)
}
pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
let t = self.get(table)?;
let idx = t.indices.iter().find(|i| i.name == index_name)?;
let locators = idx.lookup_eq(key);
let cold_u64_key = index_key_as_u64(key);
for loc in locators {
match *loc {
RowLocator::Hot(i) => {
if let Some(row) = t.rows.get(i) {
return Some(row.clone());
}
}
RowLocator::Cold {
segment_id,
page_offset: _,
} => {
let Some(u64_key) = cold_u64_key else {
continue;
};
let Some(seg) = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
else {
continue;
};
let Some(payload) = seg.lookup(u64_key) else {
continue;
};
let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
return Some(row);
}
}
}
None
}
pub fn promote_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<usize>, StorageError> {
let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
let Some((segment_id, _page_offset)) = cold_loc else {
return Ok(None);
};
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(
"promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
.into(),
)
})?;
let schema = self
.get(table_name)
.ok_or_else(|| {
StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
})?
.schema
.clone();
let seg = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_ref())
.ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: segment {segment_id} not registered on catalog"
))
})?;
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: key {u64_key} resolves to segment {segment_id} \
but the segment's bloom/page lookup didn't return a row"
))
})?;
let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
let t = self
.get_mut(table_name)
.expect("table existed at lookup time");
t.insert(row)?;
let new_hot_idx =
t.rows.len().checked_sub(1).ok_or_else(|| {
StorageError::Corrupt("promote_cold_row: empty after insert".into())
})?;
t.remove_cold_locators_for_key(index_name, key)?;
Ok(Some(new_hot_idx))
}
pub fn shadow_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<usize, StorageError> {
let t = self.get_mut(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
})?;
t.remove_cold_locators_for_key(index_name, key)
}
pub fn prepare_freeze_slice(
&self,
table_name: &str,
index_name: &str,
row_range: core::ops::Range<usize>,
) -> Result<FreezeSlice, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
if row_range.end > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: row_range end {} > row_count {}",
row_range.end,
table.rows.len()
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
for row_idx in row_range.clone() {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
rows.sort_by_key(|(k, _, _)| *k);
Ok(FreezeSlice { row_range, rows })
}
pub fn commit_freeze_slices(
&mut self,
table_name: &str,
index_name: &str,
slices: Vec<FreezeSlice>,
) -> Result<FreezeReport, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let mut ordered = slices;
ordered.sort_by_key(|s| s.row_range.start);
let mut expected_start = 0usize;
for s in &ordered {
if s.row_range.start != expected_start {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: gap/overlap at row {}; expected start {}",
s.row_range.start, expected_start
)));
}
expected_start = s.row_range.end;
}
let max_rows = expected_start;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total row range {} exceeds row_count {}",
max_rows,
table.rows.len()
)));
}
if max_rows == 0 {
return Ok(FreezeReport {
segment_id: u32::MAX,
frozen_rows: 0,
bytes_freed: 0,
segment_bytes: Vec::new(),
});
}
let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
if total_rows != max_rows {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
)));
}
let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
loop {
let mut pick: Option<usize> = None;
for (i, c) in cursors.iter().enumerate() {
let slice = &ordered[i];
if *c >= slice.rows.len() {
continue;
}
match pick {
None => pick = Some(i),
Some(j) => {
if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
pick = Some(i);
}
}
}
}
let Some(i) = pick else { break };
let row = ordered[i].rows[cursors[i]].clone();
cursors[i] += 1;
merged.push(row);
}
for w in merged.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: duplicate PK {} across slices",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> = merged
.into_iter()
.map(|(k, body, _)| (k, body))
.collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
})?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
pub fn compact_cold_segments(
&mut self,
table_name: &str,
index_name: &str,
target_segment_bytes: u64,
) -> Result<CompactReport, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: table {table_name:?} not found"
))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} not found on {table_name:?}"
))
})?;
let map = match &idx.kind {
IndexKind::BTree(m) => m,
IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
return Err(StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} is not BTree; \
compaction applies only to BTree cold-tier indices"
)));
}
};
let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
for (_key, locators) in map.iter() {
for loc in locators {
if let RowLocator::Cold { segment_id, .. } = loc {
referenced_ids.insert(*segment_id);
}
}
}
let candidate_set: BTreeSet<u32> = referenced_ids
.into_iter()
.filter(|id| {
self.cold_segments
.get(*id as usize)
.and_then(|s| s.as_deref())
.is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
})
.collect();
if candidate_set.len() < 2 {
return Ok(CompactReport {
sources: Vec::new(),
merged_segment_id: None,
merged_segment_bytes: Vec::new(),
merged_rows: 0,
deleted_rows_pruned: 0,
bytes_reclaimed_estimate: 0,
});
}
let mut source_row_count: usize = 0;
let mut source_byte_total: u64 = 0;
for &id in &candidate_set {
let seg = self.cold_segments[id as usize]
.as_ref()
.expect("candidate selected only when slot is Some");
source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
source_byte_total =
source_byte_total.saturating_add(seg.bytes().len() as u64);
}
let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
for (key, locators) in map.iter() {
for loc in locators {
let RowLocator::Cold { segment_id, .. } = loc else {
continue;
};
if !candidate_set.contains(segment_id) {
continue;
}
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} has non-integer Cold key; \
cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
let seg = self.cold_segments[*segment_id as usize]
.as_ref()
.expect("candidate slot guaranteed Some above");
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: BTree {index_name:?} points key={u64_key} \
at segment {segment_id} but the segment lookup missed"
))
})?;
collected.insert(u64_key, (payload, key.clone()));
break;
}
}
let merged_rows = collected.len();
let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
let seg_rows: Vec<(u64, Vec<u8>)> = collected
.iter()
.map(|(k, (body, _))| (*k, body.clone()))
.collect();
let (seg_bytes, _meta) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
})?;
let merged_bytes_len = seg_bytes.len() as u64;
let merged_segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
let t = self
.get(table_name)
.expect("table existed at the start of this fn");
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map) = &idx.kind else {
unreachable!("validated above");
};
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let t_mut = self
.get_mut(table_name)
.expect("table existed at the start of this fn");
let idx_mut = t_mut
.indices
.iter_mut()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
unreachable!("validated above");
};
for (key, locators) in entries {
let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
let mut changed = false;
for loc in &locators {
match *loc {
RowLocator::Cold {
segment_id,
page_offset: _,
} if candidate_set.contains(&segment_id) => {
let replacement = RowLocator::Cold {
segment_id: merged_segment_id,
page_offset: 0,
};
if !new_locs.contains(&replacement) {
new_locs.push(replacement);
}
changed = true;
}
other => new_locs.push(other),
}
}
if changed {
map_mut.insert_mut(key, new_locs);
}
}
for &id in &candidate_set {
self.tombstone_segment(id)?;
}
let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
Ok(CompactReport {
sources: candidate_set.into_iter().collect(),
merged_segment_id: Some(merged_segment_id),
merged_segment_bytes: seg_bytes,
merged_rows,
deleted_rows_pruned,
bytes_reclaimed_estimate,
})
}
fn find_cold_locator(
&self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<(u32, u32)>, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
)));
}
for loc in idx.lookup_eq(key) {
if let RowLocator::Cold {
segment_id,
page_offset,
} = *loc
{
return Ok(Some((segment_id, page_offset)));
}
}
Ok(None)
}
}
fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
match key {
IndexKey::Int(n) => Some(n.cast_unsigned()),
IndexKey::Text(_) | IndexKey::Bool(_) => None,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum StorageError {
DuplicateTable {
name: String,
},
TableNotFound {
name: String,
},
ArityMismatch {
expected: usize,
actual: usize,
},
TypeMismatch {
column: String,
expected: DataType,
actual: DataType,
position: usize,
},
NullInNotNull {
column: String,
},
DuplicateIndex {
name: String,
},
ColumnNotFound {
column: String,
},
Corrupt(String),
IndexNotFound {
name: String,
},
Unsupported(String),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
Self::TableNotFound { name } => write!(f, "table not found: {name}"),
Self::ArityMismatch { expected, actual } => write!(
f,
"row arity mismatch: expected {expected} columns, got {actual}"
),
Self::TypeMismatch {
column,
expected,
actual,
position,
} => write!(
f,
"type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
),
Self::NullInNotNull { column } => {
write!(f, "NULL value in NOT NULL column {column:?}")
}
Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
}
}
}
impl ColumnSchema {
pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
Self {
name: name.into(),
ty,
nullable,
default: None,
auto_increment: false,
}
}
#[must_use]
pub fn with_default(mut self, default: Value) -> Self {
self.default = Some(default);
self
}
#[must_use]
pub const fn with_auto_increment(mut self) -> Self {
self.auto_increment = true;
self
}
}
impl TableSchema {
pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
Self {
name: name.into(),
columns,
hot_tier_bytes: None,
foreign_keys: Vec::new(),
}
}
}
const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
const FILE_VERSION: u8 = 13;
const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
const INDEX_KEY_TAG_INT: u8 = 0;
const INDEX_KEY_TAG_TEXT: u8 = 1;
const INDEX_KEY_TAG_BOOL: u8 = 2;
impl Catalog {
pub fn serialize(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(FILE_VERSION);
write_u32(
&mut out,
u32::try_from(self.tables.len()).expect("≤ 4G tables"),
);
for t in &self.tables {
write_str(&mut out, &t.schema.name);
write_u16(
&mut out,
u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
);
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(
&mut out,
u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
);
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(
&mut out,
u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
);
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(
&mut out,
u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
);
match &idx.kind {
IndexKind::BTree(map) => {
out.push(0);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G index entries/index"),
);
for (key, locators) in map {
write_index_key(&mut out, key);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/key"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { column_type } => {
out.push(2);
write_data_type(&mut out, *column_type);
}
}
write_u16(
&mut out,
u16::try_from(idx.included_columns.len())
.expect("≤ 65k INCLUDE columns/index"),
);
for col_pos in &idx.included_columns {
write_u16(
&mut out,
u16::try_from(*col_pos).expect("≤ 65k columns/table"),
);
}
match &idx.partial_predicate {
None => out.push(0),
Some(pred) => {
out.push(1);
write_str(&mut out, pred);
}
}
match &idx.expression {
None => out.push(0),
Some(expr) => {
out.push(1);
write_str(&mut out, expr);
}
}
}
match t.schema.hot_tier_bytes {
None => out.push(0),
Some(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
}
write_u16(
&mut out,
u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
);
for fk in &t.schema.foreign_keys {
match &fk.name {
None => out.push(0),
Some(n) => {
out.push(1);
write_str(&mut out, n);
}
}
write_u16(
&mut out,
u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
);
for &p in &fk.local_columns {
write_u16(
&mut out,
u16::try_from(p).expect("≤ 65k columns/table"),
);
}
write_str(&mut out, &fk.parent_table);
write_u16(
&mut out,
u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
);
for &p in &fk.parent_columns {
write_u16(
&mut out,
u16::try_from(p).expect("≤ 65k columns/table"),
);
}
out.push(fk.on_delete.tag());
out.push(fk.on_update.tag());
}
}
out
}
pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
let mut cur = Cursor::new(buf);
let magic = cur.take(8)?;
if magic != FILE_MAGIC {
return Err(StorageError::Corrupt(format!(
"bad magic: expected SPGDB001, got {magic:?}"
)));
}
let version = cur.read_u8()?;
if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
return Err(StorageError::Corrupt(format!(
"unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
)));
}
let table_count = cur.read_u32()? as usize;
let mut cat = Self::new();
for _ in 0..table_count {
deserialize_table(&mut cur, &mut cat, version)?;
}
if cur.pos < buf.len() {
return Err(StorageError::Corrupt(format!(
"trailing bytes: {} unread",
buf.len() - cur.pos
)));
}
Ok(cat)
}
}
fn deserialize_table(
cur: &mut Cursor<'_>,
cat: &mut Catalog,
version: u8,
) -> Result<(), StorageError> {
let table_name = cur.read_str()?;
let name = table_name.clone();
let col_count = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(col_count);
for _ in 0..col_count {
let c_name = cur.read_str()?;
let ty = cur.read_data_type()?;
let nullable = cur.read_u8()? != 0;
let default = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_value()?),
other => {
return Err(StorageError::Corrupt(format!(
"unknown default tag: {other}"
)));
}
};
let auto_increment = cur.read_u8()? != 0;
cols.push(ColumnSchema {
name: c_name,
ty,
nullable,
default,
auto_increment,
});
}
let n_cols = cols.len();
cat.create_table(TableSchema::new(name, cols))?;
let t = cat.tables.last_mut().expect("create_table just pushed");
deserialize_rows(cur, t, n_cols)?;
deserialize_indices(cur, t, version)?;
if version >= 11 {
let has = cur.read_u8()?;
let hot_tier_bytes = match has {
0 => None,
1 => Some(cur.read_u64()?),
other => {
return Err(StorageError::Corrupt(format!(
"hot_tier_bytes appendix: unknown has-value byte {other}"
)));
}
};
t.schema_mut().hot_tier_bytes = hot_tier_bytes;
}
if version >= 13 {
let fk_count = cur.read_u16()? as usize;
let mut fks = Vec::with_capacity(fk_count);
for _ in 0..fk_count {
let name = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_str()?),
other => {
return Err(StorageError::Corrupt(format!(
"FK appendix: unknown has-name byte {other}"
)));
}
};
let local_arity = cur.read_u16()? as usize;
let mut local_columns = Vec::with_capacity(local_arity);
for _ in 0..local_arity {
local_columns.push(cur.read_u16()? as usize);
}
let parent_table = cur.read_str()?;
let parent_arity = cur.read_u16()? as usize;
if parent_arity != local_arity {
return Err(StorageError::Corrupt(format!(
"FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
)));
}
let mut parent_columns = Vec::with_capacity(parent_arity);
for _ in 0..parent_arity {
parent_columns.push(cur.read_u16()? as usize);
}
let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
})?;
let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_update tag".into())
})?;
fks.push(ForeignKeyConstraint {
name,
local_columns,
parent_table,
parent_columns,
on_delete,
on_update,
});
}
t.schema_mut().foreign_keys = fks;
}
let _ = table_name;
Ok(())
}
fn deserialize_rows(
cur: &mut Cursor<'_>,
t: &mut Table,
_n_cols: usize,
) -> Result<(), StorageError> {
let row_count = cur.read_u32()? as usize;
let mut hot_bytes: u64 = 0;
for _ in 0..row_count {
let tail = &cur.buf[cur.pos..];
let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
cur.pos += consumed;
hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
t.rows.push_mut(row);
}
t.hot_bytes = hot_bytes;
Ok(())
}
fn deserialize_indices(
cur: &mut Cursor<'_>,
t: &mut Table,
version: u8,
) -> Result<(), StorageError> {
let index_count = cur.read_u16()? as usize;
for _ in 0..index_count {
let idx_name = cur.read_str()?;
let col_pos = cur.read_u16()? as usize;
let column_name = t
.schema
.columns
.get(col_pos)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"index {idx_name:?} points at non-existent column position {col_pos}"
))
})?
.name
.clone();
let kind_tag = cur.read_u8()?;
match kind_tag {
0 => {
if version >= 9 {
let map = read_btree_map(cur)?;
t.restore_btree_index(idx_name, &column_name, map)?;
} else {
t.add_index(idx_name, &column_name)?;
}
}
1 => {
let m = cur.read_u16()? as usize;
let graph = cur.read_nsw_graph(m)?;
t.restore_nsw_index(idx_name, &column_name, graph)?;
}
2 => {
let column_type = cur.read_data_type()?;
t.restore_brin_index(idx_name, &column_name, column_type)?;
}
other => {
return Err(StorageError::Corrupt(format!(
"unknown index kind tag: {other}"
)));
}
}
if version >= 12 {
let num_included = cur.read_u16()? as usize;
if num_included > 0 {
let mut included: Vec<usize> = Vec::with_capacity(num_included);
for _ in 0..num_included {
let cp = cur.read_u16()? as usize;
if cp >= t.schema.columns.len() {
return Err(StorageError::Corrupt(format!(
"INCLUDE column position {cp} out of range \
({} schema columns)",
t.schema.columns.len()
)));
}
included.push(cp);
}
if let Some(last) = t.indices.last_mut() {
last.included_columns = included;
}
}
match cur.read_u8()? {
0 => {}
1 => {
let pred = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.partial_predicate = Some(pred);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"partial_predicate tag: unknown byte {other}"
)));
}
}
match cur.read_u8()? {
0 => {}
1 => {
let expr = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.expression = Some(expr);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"expression tag: unknown byte {other}"
)));
}
}
}
}
Ok(())
}
fn read_btree_map(
cur: &mut Cursor<'_>,
) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
let entry_count = cur.read_u32()? as usize;
let mut map = PersistentBTreeMap::new();
for _ in 0..entry_count {
let key = cur.read_index_key()?;
let locator_count = cur.read_u32()? as usize;
let mut locators = Vec::with_capacity(locator_count);
for _ in 0..locator_count {
let tail = &cur.buf[cur.pos..];
let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
})?;
cur.pos += consumed;
locators.push(loc);
}
map.insert_mut(key, locators);
}
Ok(map)
}
fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
let entry = g.entry.map_or(u32::MAX, |e| {
u32::try_from(e).expect("NSW entry fits in u32")
});
write_u16(
out,
u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
);
out.extend_from_slice(&entry.to_le_bytes());
out.push(g.entry_level);
let node_count = g.levels.len();
write_u32(
out,
u32::try_from(node_count).expect("HNSW node count fits in u32"),
);
for &lvl in &g.levels {
out.push(lvl);
}
let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
out.push(layer_count);
for layer in &g.layers {
write_u32(
out,
u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
);
for neighbors in layer {
write_u16(
out,
u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
);
for &peer in neighbors {
write_u32(out, peer);
}
}
}
}
fn write_data_type(out: &mut Vec<u8>, t: DataType) {
match t {
DataType::Int => out.push(1),
DataType::BigInt => out.push(2),
DataType::Float => out.push(3),
DataType::Text => out.push(4),
DataType::Bool => out.push(5),
DataType::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => {
out.push(6);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::F16 => {
out.push(15);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::Sq8 => {
out.push(14);
out.extend_from_slice(&dim.to_le_bytes());
}
},
DataType::SmallInt => out.push(7),
DataType::Varchar(max) => {
out.push(8);
out.extend_from_slice(&max.to_le_bytes());
}
DataType::Char(size) => {
out.push(9);
out.extend_from_slice(&size.to_le_bytes());
}
DataType::Numeric { precision, scale } => {
out.push(10);
out.push(precision);
out.push(scale);
}
DataType::Date => out.push(11),
DataType::Timestamp => out.push(12),
DataType::Interval => {
unreachable!("DataType::Interval has no on-disk encoding in v2.11")
}
DataType::Json => out.push(13),
}
}
impl Cursor<'_> {
fn read_data_type(&mut self) -> Result<DataType, StorageError> {
let tag = self.read_u8()?;
match tag {
1 => Ok(DataType::Int),
2 => Ok(DataType::BigInt),
3 => Ok(DataType::Float),
4 => Ok(DataType::Text),
5 => Ok(DataType::Bool),
6 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F32,
}),
7 => Ok(DataType::SmallInt),
8 => Ok(DataType::Varchar(self.read_u32()?)),
9 => Ok(DataType::Char(self.read_u32()?)),
10 => {
let precision = self.read_u8()?;
let scale = self.read_u8()?;
Ok(DataType::Numeric { precision, scale })
}
11 => Ok(DataType::Date),
12 => Ok(DataType::Timestamp),
13 => Ok(DataType::Json),
14 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::Sq8,
}),
15 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F16,
}),
other => Err(StorageError::Corrupt(format!(
"unknown data type tag: {other}"
))),
}
}
}
pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"row_body_encoded_len: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut n = bitmap_bytes;
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
n += value_body_encoded_len(v, schema.columns[col_idx].ty);
}
n
}
fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
match v {
Value::SmallInt(_) => 2,
Value::Int(_) | Value::Date(_) => 4,
Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
Value::Bool(_) => 1,
Value::Text(s) | Value::Json(s) => 2 + s.len(),
Value::Vector(vec) => 4 + 4 * vec.len(),
Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
Value::HalfVector(h) => 4 + h.bytes.len(),
Value::Numeric { .. } => 16 + 1,
Value::Null => 0,
Value::Interval { .. } => {
unreachable!("Value::Interval has no on-disk encoding")
}
}
}
pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"dense encode: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
let bitmap_offset = out.len();
out.resize(bitmap_offset + bitmap_bytes, 0);
for (i, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
out[bitmap_offset + i / 8] |= 1 << (i % 8);
}
}
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
write_value_body(&mut out, v, schema.columns[col_idx].ty);
}
out
}
pub fn decode_row_body_dense(
bytes: &[u8],
schema: &TableSchema,
) -> Result<(Row, usize), StorageError> {
let mut cur = Cursor::new(bytes);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut bitmap_buf = [0u8; 32];
if bitmap_bytes > bitmap_buf.len() {
return Err(StorageError::Corrupt(format!(
"row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
)));
}
let slice = cur.take(bitmap_bytes)?;
bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
let mut values = Vec::with_capacity(schema.columns.len());
for (col_idx, col) in schema.columns.iter().enumerate() {
if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
values.push(Value::Null);
} else {
values.push(cur.read_value_body(col.ty)?);
}
}
Ok((Row { values }, cur.pos))
}
fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
match (v, ty) {
(Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
(Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
(Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
(Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
write_str(out, s);
}
(
Value::Vector(v),
DataType::Vector {
encoding: VecEncoding::F32,
..
},
) => {
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
(
Value::Sq8Vector(q),
DataType::Vector {
encoding: VecEncoding::Sq8,
..
},
) => {
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
(
Value::HalfVector(h),
DataType::Vector {
encoding: VecEncoding::F16,
..
},
) => {
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
(Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
out.extend_from_slice(&scaled.to_le_bytes());
out.push(scale);
}
(Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
(Value::Timestamp(t), DataType::Timestamp) => out.extend_from_slice(&t.to_le_bytes()),
(Value::Json(s), DataType::Json) => write_str(out, s),
(other, ty) => unreachable!(
"schema-driven encode received mismatched value/type pair: \
value tag={:?}, column type={:?}",
other.data_type(),
ty
),
}
}
fn write_value(out: &mut Vec<u8>, v: &Value) {
match v {
Value::Null => out.push(0),
Value::SmallInt(n) => {
out.push(7);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Int(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
Value::BigInt(n) => {
out.push(2);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Float(x) => {
out.push(3);
out.extend_from_slice(&x.to_le_bytes());
}
Value::Text(s) | Value::Json(s) => {
out.push(4);
write_str(out, s);
}
Value::Bool(b) => {
out.push(5);
out.push(u8::from(*b));
}
Value::Vector(v) => {
out.push(6);
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
Value::Sq8Vector(q) => {
out.push(11);
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
Value::HalfVector(h) => {
out.push(12);
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
Value::Numeric { scaled, scale } => {
out.push(8);
out.extend_from_slice(&scaled.to_le_bytes());
out.push(*scale);
}
Value::Date(d) => {
out.push(9);
out.extend_from_slice(&d.to_le_bytes());
}
Value::Timestamp(t) => {
out.push(10);
out.extend_from_slice(&t.to_le_bytes());
}
Value::Interval { .. } => {
unreachable!(
"Value::Interval has no on-disk encoding; engine must reject it before write"
)
}
}
}
fn write_u16(out: &mut Vec<u8>, n: u16) {
out.extend_from_slice(&n.to_le_bytes());
}
fn write_u32(out: &mut Vec<u8>, n: u32) {
out.extend_from_slice(&n.to_le_bytes());
}
fn write_str(out: &mut Vec<u8>, s: &str) {
let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
write_u16(out, len);
out.extend_from_slice(s.as_bytes());
}
fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
match key {
IndexKey::Int(n) => {
out.push(INDEX_KEY_TAG_INT);
out.extend_from_slice(&n.to_le_bytes());
}
IndexKey::Text(s) => {
out.push(INDEX_KEY_TAG_TEXT);
write_str(out, s);
}
IndexKey::Bool(b) => {
out.push(INDEX_KEY_TAG_BOOL);
out.push(u8::from(*b));
}
}
}
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
}
impl<'a> Cursor<'a> {
const fn new(buf: &'a [u8]) -> Self {
Self { buf, pos: 0 }
}
fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
let end = self
.pos
.checked_add(n)
.ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
if end > self.buf.len() {
return Err(StorageError::Corrupt(format!(
"unexpected EOF at offset {} (wanted {n} more bytes)",
self.pos
)));
}
let s = &self.buf[self.pos..end];
self.pos = end;
Ok(s)
}
fn read_u8(&mut self) -> Result<u8, StorageError> {
Ok(self.take(1)?[0])
}
fn read_u16(&mut self) -> Result<u16, StorageError> {
let s = self.take(2)?;
Ok(u16::from_le_bytes([s[0], s[1]]))
}
fn read_u32(&mut self) -> Result<u32, StorageError> {
let s = self.take(4)?;
Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_i32(&mut self) -> Result<i32, StorageError> {
let s = self.take(4)?;
Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_u64(&mut self) -> Result<u64, StorageError> {
let s = self.take(8)?;
Ok(u64::from_le_bytes([
s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
]))
}
fn read_i64(&mut self) -> Result<i64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(i64::from_le_bytes(arr))
}
fn read_f64(&mut self) -> Result<f64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(f64::from_le_bytes(arr))
}
fn read_f32(&mut self) -> Result<f32, StorageError> {
let s = self.take(4)?;
Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_str(&mut self) -> Result<String, StorageError> {
let len = self.read_u16()? as usize;
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
}
fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
let tag = self.read_u8()?;
match tag {
INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
other => Err(StorageError::Corrupt(format!(
"unknown index key tag: {other}"
))),
}
}
fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
match ty {
DataType::SmallInt => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
DataType::Int => Ok(Value::Int(self.read_i32()?)),
DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
DataType::Float => Ok(Value::Float(self.read_f64()?)),
DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
Ok(Value::Text(self.read_str()?))
}
DataType::Vector {
encoding: VecEncoding::F32,
..
} => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
DataType::Vector {
encoding: VecEncoding::Sq8,
..
} => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
DataType::Vector {
encoding: VecEncoding::F16,
..
} => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
DataType::Numeric { .. } => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
DataType::Date => Ok(Value::Date(self.read_i32()?)),
DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
DataType::Interval => {
Err(StorageError::Corrupt(
"INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
))
}
DataType::Json => Ok(Value::Json(self.read_str()?)),
}
}
fn read_value(&mut self) -> Result<Value, StorageError> {
let tag = self.read_u8()?;
match tag {
0 => Ok(Value::Null),
1 => Ok(Value::Int(self.read_i32()?)),
2 => Ok(Value::BigInt(self.read_i64()?)),
3 => Ok(Value::Float(self.read_f64()?)),
4 => Ok(Value::Text(self.read_str()?)),
5 => Ok(Value::Bool(self.read_u8()? != 0)),
6 => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
7 => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
8 => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
9 => Ok(Value::Date(self.read_i32()?)),
10 => Ok(Value::Timestamp(self.read_i64()?)),
11 => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
12 => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
}
}
fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
let m_max_0 = self.read_u16()? as usize;
let entry_raw = self.read_u32()?;
let entry = if entry_raw == u32::MAX {
None
} else {
Some(entry_raw as usize)
};
let entry_level = self.read_u8()?;
let node_count = self.read_u32()? as usize;
let mut levels: PersistentVec<u8> = PersistentVec::new();
for _ in 0..node_count {
levels.push_mut(self.read_u8()?);
}
let layer_count = self.read_u8()? as usize;
let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
for _ in 0..layer_count {
let n = self.read_u32()? as usize;
let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
for _ in 0..n {
let cnt = self.read_u16()? as usize;
let mut row: Vec<u32> = Vec::with_capacity(cnt);
for _ in 0..cnt {
row.push(self.read_u32()?);
}
per_layer.push_mut(row);
}
layers.push(per_layer);
}
Ok(NswGraph {
m,
m_max_0,
entry,
entry_level,
levels,
layers,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::ToString;
use alloc::vec;
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_l2_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = l2_distance_sq_scalar(&a, &b);
let neon = unsafe { l2_distance_sq_neon(&a, &b) };
let tol = (scalar.abs().max(1e-6)) * 1e-4;
assert!(
(scalar - neon).abs() <= tol,
"dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_inner_product_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = inner_product_scalar(&a, &b);
let neon = unsafe { inner_product_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(scalar - neon).abs() <= tol,
"IP dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[allow(clippy::similar_names)]
#[test]
fn neon_cosine_dot_norms_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
#[allow(clippy::cast_precision_loss)]
let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(dot_s - dot_n).abs() <= tol_d,
"cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
);
assert!(
(na_s - na_n).abs() <= tol_n,
"cosine na dim={d}: scalar={na_s} neon={na_n}"
);
assert!(
(nb_s - nb_n).abs() <= tol_n,
"cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
);
}
}
fn make_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("score", DataType::Float, true),
],
)
}
#[test]
fn value_type_tag_matches_variant() {
assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
assert_eq!(Value::Null.data_type(), None);
assert!(Value::Null.is_null());
assert!(!Value::Int(0).is_null());
}
#[test]
fn sq8_value_reports_sq8_data_type() {
let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
let v = Value::Sq8Vector(q);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 5,
encoding: VecEncoding::Sq8,
}),
);
}
#[test]
fn datatype_display_matches_pg_keyword() {
assert_eq!(DataType::Int.to_string(), "INT");
assert_eq!(DataType::BigInt.to_string(), "BIGINT");
assert_eq!(DataType::Float.to_string(), "FLOAT");
assert_eq!(DataType::Text.to_string(), "TEXT");
assert_eq!(DataType::Bool.to_string(), "BOOL");
}
#[test]
fn row_len_and_emptiness() {
let r = Row::new(vec![Value::Int(1), Value::Null]);
assert_eq!(r.len(), 2);
assert!(!r.is_empty());
assert!(Row::new(Vec::new()).is_empty());
}
#[test]
fn table_schema_column_position() {
let s = make_users_schema();
assert_eq!(s.column_position("id"), Some(0));
assert_eq!(s.column_position("score"), Some(2));
assert_eq!(s.column_position("missing"), None);
}
#[test]
fn catalog_create_table_then_lookup() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_eq!(cat.table_count(), 1);
assert!(cat.get("users").is_some());
assert!(cat.get("nope").is_none());
}
#[test]
fn catalog_duplicate_table_is_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let err = cat.create_table(make_users_schema()).unwrap_err();
assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
}
#[test]
fn table_insert_happy_path_appends_row() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(99.5),
]))
.unwrap();
assert_eq!(t.row_count(), 1);
assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
}
#[test]
fn table_insert_arity_mismatch() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
assert!(matches!(
err,
StorageError::ArityMismatch {
expected: 3,
actual: 1
}
));
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_type_mismatch_reports_column() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Int(42), Value::Float(0.0),
]))
.unwrap_err();
match err {
StorageError::TypeMismatch {
ref column,
expected,
actual,
position,
} => {
assert_eq!(column, "name");
assert_eq!(expected, DataType::Text);
assert_eq!(actual, DataType::Int);
assert_eq!(position, 1);
}
other => panic!("unexpected: {other:?}"),
}
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_null_into_not_null_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Null, Value::Float(1.0),
]))
.unwrap_err();
assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
}
#[test]
fn table_insert_null_into_nullable_ok() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_eq!(t.row_count(), 1);
}
#[test]
fn catalog_get_mut_independent_per_table() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"a",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.create_table(TableSchema::new(
"b",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.get_mut("a")
.unwrap()
.insert(Row::new(vec![Value::Int(1)]))
.unwrap();
assert_eq!(cat.get("a").unwrap().row_count(), 1);
assert_eq!(cat.get("b").unwrap().row_count(), 0);
}
fn assert_round_trip(cat: &Catalog) {
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
assert_eq!(restored.table_count(), cat.table_count());
for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
assert_eq!(a.schema, b.schema);
assert_eq!(a.rows, b.rows);
}
}
#[test]
fn serialize_empty_catalog_round_trips() {
assert_round_trip(&Catalog::new());
}
#[test]
fn serialize_single_empty_table_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_round_trip(&cat);
}
#[test]
fn nsw_clone_is_o1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..1500_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]))
.unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let g = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
};
assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
assert!(
g.layers.len() >= 2,
"1500 nodes should populate at least two HNSW layers, got {}",
g.layers.len()
);
let cloned = g.clone();
assert!(
g.levels.shares_storage_with(&cloned.levels),
"levels PV not shared after clone — clone copied elements (O(N))"
);
assert_eq!(g.layers.len(), cloned.layers.len());
for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
assert!(
orig.shares_storage_with(cl),
"layer {l} PV not shared after clone — clone copied elements (O(N))"
);
}
}
#[test]
fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Sq8Vector(quantize::quantize(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
use crate::halfvec;
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
#[allow(clippy::similar_names)]
fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
use crate::halfvec;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xF16_F16_F16_F16_u64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check halfvec dispatch in `cell_to_query_metric_distance`"
);
}
#[test]
fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
use crate::quantize;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::Sq8Vector(quantize::quantize(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
);
}
#[test]
fn nsw_index_topology_persists_through_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..6_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
let row = Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]);
t.insert(row).unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let original = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
};
assert_eq!(restored_graph.m, original.m);
assert_eq!(restored_graph.m_max_0, original.m_max_0);
assert_eq!(restored_graph.entry, original.entry);
assert_eq!(restored_graph.entry_level, original.entry_level);
assert_eq!(restored_graph.levels, original.levels);
assert_eq!(restored_graph.layers, original.layers);
}
#[test]
fn hnsw_level_assignment_is_deterministic() {
for i in 0..32usize {
assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
}
}
#[test]
fn hnsw_layer_0_dominates_population() {
let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
}
#[test]
fn hnsw_search_matches_brute_force_for_l2_top1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
(1, [0.0, 0.0, 0.0]),
(2, [1.0, 0.0, 0.0]),
(3, [0.0, 1.0, 0.0]),
(4, [0.0, 0.0, 1.0]),
(5, [1.0, 1.0, 0.0]),
(6, [1.0, 0.0, 1.0]),
(7, [0.0, 1.0, 1.0]),
(8, [1.0, 1.0, 1.0]),
(9, [0.5, 0.5, 0.5]),
(10, [0.2, 0.8, 0.5]),
];
for &(id, v) in &dataset {
t.insert(Row::new(alloc::vec![
Value::Int(id),
Value::Vector(alloc::vec![v[0], v[1], v[2]]),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let idx_pos = cat
.get("vecs")
.unwrap()
.indices()
.iter()
.position(|i| i.name == "v_idx")
.unwrap();
for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
let table = cat.get("vecs").unwrap();
let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
.map(|i| {
let Value::Vector(v) = &table.rows[i].values[1] else {
return (f32::INFINITY, i);
};
(l2_distance_sq(v, &query), i)
})
.collect();
brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
assert!(!hnsw_top.is_empty(), "HNSW returned no results");
assert_eq!(
hnsw_top[0].1, brute[0].1,
"HNSW top-1 != brute-force top-1 for {query:?}"
);
}
}
#[test]
fn serialize_table_with_rows_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(95.5),
]))
.unwrap();
t.insert(Row::new(vec![
Value::Int(2),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn serialize_multiple_tables_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
cat.create_table(TableSchema::new(
"flags",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("active", DataType::Bool, false),
],
))
.unwrap();
cat.get_mut("flags")
.unwrap()
.insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn deserialize_rejects_bad_magic() {
let mut buf = b"BADMAGIC".to_vec();
buf.push(FILE_VERSION);
buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(_)));
}
#[test]
fn deserialize_rejects_unsupported_version() {
let mut buf = FILE_MAGIC.to_vec();
buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
}
#[test]
fn deserialize_rejects_truncated_file() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let bytes = cat.serialize();
let truncated = &bytes[..bytes.len() - 1];
assert!(matches!(
Catalog::deserialize(truncated),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn deserialize_rejects_trailing_garbage() {
let cat = Catalog::new();
let mut bytes = cat.serialize();
bytes.push(0xFF);
assert!(matches!(
Catalog::deserialize(&bytes),
Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
));
}
fn populated_users() -> Catalog {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name, score) in [
(1, "alice", Some(90.0)),
(2, "bob", None),
(3, "alice", Some(70.0)), ] {
t.insert(Row::new(vec![
Value::Int(id),
Value::Text(name.into()),
score.map_or(Value::Null, Value::Float),
]))
.unwrap();
}
cat
}
#[test]
fn add_index_builds_from_existing_rows() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_id".into(), "id")
.unwrap();
let t = cat.get("users").unwrap();
let idx = t.index_on(0).expect("index_on(0)");
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
}
#[test]
fn add_index_dup_name_rejected() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("ix".into(), "id").unwrap();
let err = t.add_index("ix".into(), "name").unwrap_err();
assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
}
#[test]
fn add_index_unknown_column_rejected() {
let mut cat = populated_users();
let err = cat
.get_mut("users")
.unwrap()
.add_index("ix".into(), "ghost")
.unwrap_err();
assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
}
#[test]
fn insert_after_create_index_updates_it() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_name".into(), "name").unwrap();
t.insert(Row::new(vec![
Value::Int(4),
Value::Text("dave".into()),
Value::Null,
]))
.unwrap();
let idx = t.index_on(1).unwrap();
assert_eq!(
idx.lookup_eq(&IndexKey::Text("dave".into())),
&[RowLocator::Hot(3)]
);
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
#[test]
fn null_or_float_values_are_not_indexed() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_score".into(), "score").unwrap();
let idx = t.index_on(2).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
}
#[test]
fn vector_value_data_type_carries_dim() {
let v = Value::Vector(vec![1.0, 2.0, 3.0]);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
})
);
}
#[test]
fn vector_column_insert_matching_dim_ok() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
.unwrap();
}
#[test]
fn vector_column_insert_dim_mismatch_rejected() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
let err = cat
.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
.unwrap_err();
assert!(matches!(err, StorageError::TypeMismatch { .. }));
}
#[test]
fn vector_value_survives_catalog_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
]))
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
let table = restored.get("emb").unwrap();
assert_eq!(
table.schema().columns[1].ty,
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32
}
);
assert_eq!(
table.rows()[0].values[1],
Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
);
}
#[test]
fn index_survives_serialize_deserialize_round_trip() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(idx.name, "by_name");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
fn bigint_pk_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("name", DataType::Text, false),
],
)
}
fn make_user_row(id: i64, name: &str) -> Row {
Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
}
#[test]
fn lookup_by_pk_finds_row_via_hot_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap();
assert_eq!(got, make_user_row(2, "bob"));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn lookup_by_pk_returns_none_when_key_missing() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
assert!(
cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
.is_none()
);
assert!(
cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
.is_none()
);
}
#[test]
fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> =
vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _meta) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(seg_id, 0);
assert_eq!(cat.cold_segment_count(), 1);
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
let registered = cat
.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(registered, 4);
for (id, name) in &cold_rows {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap_or_else(|| panic!("cold key {id} not found"));
assert_eq!(got, make_user_row(*id, name));
}
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
}
#[test]
fn lookup_by_pk_mixes_hot_and_cold_tiers() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.unwrap(),
make_user_row(1, "alice")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
.unwrap(),
make_user_row(100, "ivy")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
.unwrap(),
make_user_row(200, "joe")
);
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
.is_none()
);
}
#[test]
fn register_cold_locators_rejects_nsw_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
]))
.unwrap();
t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
let err = t
.register_cold_locators(
"by_v",
vec![(
IndexKey::Int(1),
RowLocator::Cold {
segment_id: 0,
page_offset: 0,
},
)],
)
.unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
}
#[test]
fn load_segment_bytes_rejects_garbage() {
let mut cat = Catalog::new();
let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn load_segment_bytes_returns_sequential_ids() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let schema = cat.get("users").unwrap().schema.clone();
for batch in 0u32..3 {
let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
.map(|i| {
let id = u64::from(batch) * 100 + i;
let row = make_user_row(id.cast_signed(), "x");
(id, encode_row_body_dense(&row, &schema))
})
.collect();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
}
assert_eq!(cat.cold_segment_count(), 3);
}
#[test]
fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let v8_bytes = encode_as_v8(&cat);
assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
}
}
fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(8u8);
write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
for t in &cat.tables {
write_str(&mut out, &t.schema.name);
write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
match &idx.kind {
IndexKind::BTree(_) => out.push(0),
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).unwrap());
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { .. } => panic!(
"v8 catalog writer cannot serialise BRIN — \
tests with BRIN indices must use the current writer"
),
}
}
}
out
}
#[test]
fn v9_catalog_round_trip_preserves_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
let bytes = cat.serialize();
assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(restored_seg_id, seg_id);
let idx = restored.get("users").unwrap().index_on(0).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
for (id, _) in &cold_rows {
assert_eq!(
idx.lookup_eq(&IndexKey::Int(*id)),
&[RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
}]
);
}
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
for (id, name) in &cold_rows {
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap(),
make_user_row(*id, name)
);
}
}
#[test]
fn row_body_encoded_len_matches_actual_encode_for_all_types() {
let schema = TableSchema::new(
"wide",
vec![
ColumnSchema::new("a", DataType::SmallInt, true),
ColumnSchema::new("b", DataType::Int, false),
ColumnSchema::new("c", DataType::BigInt, false),
ColumnSchema::new("d", DataType::Float, false),
ColumnSchema::new("e", DataType::Bool, false),
ColumnSchema::new("f", DataType::Text, false),
ColumnSchema::new(
"g",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
),
ColumnSchema::new(
"h",
DataType::Numeric {
precision: 18,
scale: 2,
},
false,
),
ColumnSchema::new("i", DataType::Date, false),
ColumnSchema::new("j", DataType::Timestamp, false),
],
);
let cases: &[Row] = &[
Row::new(vec![
Value::SmallInt(7),
Value::Int(42),
Value::BigInt(1_000_000),
Value::Float(1.5),
Value::Bool(true),
Value::Text("hello".into()),
Value::Vector(vec![1.0, 2.0, 3.0]),
Value::Numeric {
scaled: 12345,
scale: 2,
},
Value::Date(20_000),
Value::Timestamp(1_700_000_000_000_000),
]),
Row::new(vec![
Value::Null,
Value::Int(0),
Value::BigInt(0),
Value::Float(0.0),
Value::Bool(false),
Value::Text(String::new()),
Value::Vector(vec![]),
Value::Numeric {
scaled: 0,
scale: 2,
},
Value::Date(0),
Value::Timestamp(0),
]),
Row::new(vec![
Value::SmallInt(-1),
Value::Int(-1),
Value::BigInt(-1),
Value::Float(-0.5),
Value::Bool(true),
Value::Text("a much longer payload here".into()),
Value::Vector(vec![0.1, 0.2, 0.3]),
Value::Numeric {
scaled: -999_999_999,
scale: 2,
},
Value::Date(-1),
Value::Timestamp(-1),
]),
];
for row in cases {
let actual = encode_row_body_dense(row, &schema).len();
let fast = row_body_encoded_len(row, &schema);
assert_eq!(actual, fast, "row {row:?}");
}
}
#[test]
fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
assert_eq!(t.hot_bytes(), 0);
let mut expected: u64 = 0;
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
let row = make_user_row(id, name);
expected += encode_row_body_dense(&row, &t.schema).len() as u64;
t.insert(row).unwrap();
}
assert_eq!(t.hot_bytes(), expected);
assert_eq!(cat.hot_tier_bytes(), expected);
}
#[test]
fn hot_bytes_shrinks_on_delete() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
let before = t.hot_bytes();
let bob_row = make_user_row(2, "bob");
let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
let removed = t.delete_rows(&[1]);
assert_eq!(removed, 1);
assert_eq!(t.hot_bytes(), before - bob_bytes);
}
#[test]
fn hot_bytes_diffs_on_update_for_variable_width_columns() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
let after_insert = t.hot_bytes();
let new_row = make_user_row(1, "alice-the-longer-name");
let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
t.update_row(0, new_row.values).unwrap();
assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
}
#[test]
fn hot_bytes_round_trips_through_serialize_deserialize() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for i in 0..10 {
t.insert(make_user_row(i, &alloc::format!("name-{i}")))
.unwrap();
}
let pre = cat.hot_tier_bytes();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
assert_eq!(restored.hot_tier_bytes(), pre);
assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
}
#[test]
fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let total_bytes_before = t.hot_bytes();
let report = cat
.freeze_oldest_to_cold("users", "by_id", 6)
.expect("freeze succeeds");
assert_eq!(report.frozen_rows, 6);
assert_eq!(report.segment_id, 0);
assert!(report.bytes_freed > 0);
assert!(!report.segment_bytes.is_empty());
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(
t.hot_bytes(),
total_bytes_before - report.bytes_freed,
"hot_bytes accounting matches FreezeReport"
);
for id in 0..10i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_twice_preserves_prior_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..12i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("first freeze ok");
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("second freeze ok");
assert_eq!(cat.get("users").unwrap().row_count(), 4);
assert_eq!(cat.cold_segment_count(), 2);
for id in 0..12i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_oldest_to_cold_rejects_invalid_input() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 0),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("missing", "by_id", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "no_such_index", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 999),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.get("users").unwrap().row_count(), 3);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_oldest_to_cold_rejects_non_integer_pk() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"by_name",
vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("payload", DataType::BigInt, false),
],
))
.unwrap();
let t = cat.get_mut("by_name").unwrap();
t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
.unwrap();
t.add_index("by_n".into(), "name").unwrap();
let err = cat
.freeze_oldest_to_cold("by_name", "by_n", 1)
.expect_err("non-integer PK rejected");
match err {
StorageError::Corrupt(s) => assert!(
s.contains("non-integer"),
"error message names the constraint: {s}"
),
other => panic!("expected Corrupt, got {other:?}"),
}
assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.add_index("by_name".into(), "name").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let idx = cat.get("users").unwrap().index_on(1).unwrap();
let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
assert_eq!(got.len(), 1);
assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
match got[0] {
RowLocator::Hot(i) => {
assert_eq!(i, 1);
}
RowLocator::Cold { .. } => unreachable!(),
}
}
#[test]
fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
let new_idx = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(2))
.expect("promote ok")
.expect("PK 2 was cold");
assert_eq!(
new_idx, 2,
"promoted row appended after the 2 surviving hot rows"
);
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
let row = make_user_row(2, "u-2");
let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
assert_eq!(entries.len(), 1, "exactly one locator per key");
assert!(entries[0].is_hot(), "promote retired the Cold locator");
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
row
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
}
#[test]
fn promote_cold_row_returns_none_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(7, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
.unwrap()
.is_none()
);
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
.unwrap()
.is_none()
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..5i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_some(),
"frozen PK resolves before shadow"
);
let removed = cat
.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap();
assert_eq!(removed, 1, "exactly one cold locator retired");
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_none(),
"shadowed key no longer resolves"
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "u-2")
);
}
#[test]
fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
0,
"hot-only key drops no cold locators"
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
.unwrap(),
0,
"absent key drops no cold locators"
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
}
#[test]
fn promote_and_shadow_reject_invalid_inputs() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
let slice = b
.prepare_freeze_slice("users", "by_id", 0..6)
.expect("prepare");
let parallel = b
.commit_freeze_slices("users", "by_id", alloc::vec![slice])
.expect("commit");
assert_eq!(single.segment_id, parallel.segment_id);
assert_eq!(single.frozen_rows, parallel.frozen_rows);
assert_eq!(single.bytes_freed, parallel.bytes_freed);
assert_eq!(single.segment_bytes, parallel.segment_bytes);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after single vs slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_two_slices_match_single_slice() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a
.prepare_freeze_slice("users", "by_id", 0..8)
.expect("prepare");
let one = a
.commit_freeze_slices("users", "by_id", alloc::vec![single])
.expect("commit one");
let s1 = b
.prepare_freeze_slice("users", "by_id", 0..4)
.expect("prepare s1");
let s2 = b
.prepare_freeze_slice("users", "by_id", 4..8)
.expect("prepare s2");
let two = b
.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
.expect("commit two");
assert_eq!(one.segment_bytes, two.segment_bytes);
assert_eq!(one.frozen_rows, two.frozen_rows);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after one-slice vs two-slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_rejects_gap() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
assert!(matches!(
cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 6);
}
#[test]
fn commit_freeze_slices_empty_is_noop() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.commit_freeze_slices("users", "by_id", Vec::new())
.unwrap();
assert_eq!(report.frozen_rows, 0);
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 3);
}
#[test]
fn compact_merges_small_segments_storage_unit() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..8i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(cat.cold_segment_count(), 2);
assert_eq!(cat.cold_segment_slot_count(), 2);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let target = max_seg_bytes + 1;
let report = cat
.compact_cold_segments("users", "by_id", target)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
let merged_id = report.merged_segment_id.expect("merge happened");
assert_eq!(report.merged_rows, 6);
assert_eq!(report.deleted_rows_pruned, 0);
assert!(!report.merged_segment_bytes.is_empty());
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(cat.cold_segment_slot_count(), 3);
assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
for id in 0..8i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost after compaction"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn compact_drops_shadowed_cold_rows() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
1
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
.unwrap(),
1
);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
assert_eq!(report.deleted_rows_pruned, 2);
for shadowed in [1i64, 4i64] {
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
.is_none(),
"shadowed PK {shadowed} must remain invisible after compact"
);
}
for live in [0i64, 2, 3, 5] {
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
.unwrap_or_else(|| panic!("live PK {live} lost after compact"));
}
}
#[test]
fn compact_is_noop_below_two_candidates() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert!(report.sources.is_empty());
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
let report = cat
.compact_cold_segments("users", "by_id", 1)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
}
#[test]
fn compact_swap_survives_catalog_roundtrip_via_load_at() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact ok");
let merged_id = report.merged_segment_id.unwrap();
let cat_bytes = cat.serialize();
let merged_bytes = report.merged_segment_bytes.clone();
let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
restored
.load_segment_bytes_at(merged_id, merged_bytes)
.expect("reload merged ok");
for id in 0..6i64 {
let got = restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
assert_eq!(restored.cold_segment_count(), 1);
}
#[test]
fn load_segment_bytes_at_pads_and_rejects_collision() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let bytes_seg0 = report.segment_bytes.clone();
cat.load_segment_bytes_at(5, bytes_seg0.clone())
.expect("pad + load ok");
assert_eq!(cat.cold_segment_slot_count(), 6);
assert_eq!(cat.cold_segment_count(), 2);
assert!(matches!(
cat.load_segment_bytes_at(5, bytes_seg0.clone()),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.load_segment_bytes_at(0, bytes_seg0),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn promote_then_refreeze_does_not_leave_orphan_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let promoted = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(0))
.unwrap();
assert!(promoted.is_some());
let entries_after_promote = cat
.get("users")
.unwrap()
.index_on(0)
.unwrap()
.lookup_eq(&IndexKey::Int(0))
.to_vec();
assert_eq!(entries_after_promote.len(), 1);
assert!(entries_after_promote[0].is_hot());
for id in [2i64, 3] {
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap(),
make_user_row(id, &alloc::format!("u-{id}"))
);
}
}
}