#![no_std]
#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
extern crate alloc;
pub mod bloom;
pub mod fts_simple;
pub mod halfvec;
pub mod persistent;
pub mod persistent_btree;
pub mod quantize;
pub mod row_locator;
pub mod segment;
pub mod trgm;
pub use self::bloom::{BloomError, BloomFilter};
pub use self::row_locator::{RowLocator, RowLocatorError};
pub use self::segment::{
BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
wrap_v2_envelope_with_brin,
};
use alloc::boxed::Box;
use alloc::collections::{BTreeMap, BTreeSet};
use alloc::format;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt;
use self::persistent::PersistentVec;
use self::persistent_btree::PersistentBTreeMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VecEncoding {
#[default]
F32,
Sq8,
F16,
}
impl fmt::Display for VecEncoding {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::F32 => f.write_str("F32"),
Self::Sq8 => f.write_str("SQ8"),
Self::F16 => f.write_str("HALF"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataType {
SmallInt,
Int, BigInt, Float, Text,
Varchar(u32),
Char(u32),
Bool,
Vector {
dim: u32,
encoding: VecEncoding,
},
Numeric {
precision: u8,
scale: u8,
},
Date,
Timestamp,
Timestamptz,
Interval,
Json,
Jsonb,
Bytes,
TextArray,
IntArray,
BigIntArray,
TsVector,
TsQuery,
Uuid,
Time,
Year,
TimeTz,
Money,
Range(RangeKind),
Hstore,
IntArray2D,
BigIntArray2D,
TextArray2D,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum RangeKind {
Int4,
Int8,
Num,
Ts,
TsTz,
Date,
}
impl RangeKind {
pub const fn tag(self) -> u8 {
match self {
Self::Int4 => 0,
Self::Int8 => 1,
Self::Num => 2,
Self::Ts => 3,
Self::TsTz => 4,
Self::Date => 5,
}
}
pub const fn from_tag(t: u8) -> Option<Self> {
Some(match t {
0 => Self::Int4,
1 => Self::Int8,
2 => Self::Num,
3 => Self::Ts,
4 => Self::TsTz,
5 => Self::Date,
_ => return None,
})
}
pub const fn keyword(self) -> &'static str {
match self {
Self::Int4 => "INT4RANGE",
Self::Int8 => "INT8RANGE",
Self::Num => "NUMRANGE",
Self::Ts => "TSRANGE",
Self::TsTz => "TSTZRANGE",
Self::Date => "DATERANGE",
}
}
}
impl fmt::Display for DataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SmallInt => f.write_str("SMALLINT"),
Self::Int => f.write_str("INT"),
Self::BigInt => f.write_str("BIGINT"),
Self::Float => f.write_str("FLOAT"),
Self::Text => f.write_str("TEXT"),
Self::Varchar(n) => write!(f, "VARCHAR({n})"),
Self::Char(n) => write!(f, "CHAR({n})"),
Self::Bool => f.write_str("BOOL"),
Self::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => write!(f, "VECTOR({dim})"),
VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
},
Self::Numeric { precision, scale } => {
if *scale == 0 {
write!(f, "NUMERIC({precision})")
} else {
write!(f, "NUMERIC({precision}, {scale})")
}
}
Self::Date => f.write_str("DATE"),
Self::Timestamp => f.write_str("TIMESTAMP"),
Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
Self::Interval => f.write_str("INTERVAL"),
Self::Json => f.write_str("JSON"),
Self::Jsonb => f.write_str("JSONB"),
Self::Bytes => f.write_str("BYTEA"),
Self::TextArray => f.write_str("TEXT[]"),
Self::IntArray => f.write_str("INT[]"),
Self::BigIntArray => f.write_str("BIGINT[]"),
Self::TsVector => f.write_str("TSVECTOR"),
Self::TsQuery => f.write_str("TSQUERY"),
Self::Uuid => f.write_str("UUID"),
Self::Time => f.write_str("TIME"),
Self::Year => f.write_str("YEAR"),
Self::TimeTz => f.write_str("TIMETZ"),
Self::Money => f.write_str("MONEY"),
Self::Range(k) => f.write_str(k.keyword()),
Self::Hstore => f.write_str("HSTORE"),
Self::IntArray2D => f.write_str("INT[][]"),
Self::BigIntArray2D => f.write_str("BIGINT[][]"),
Self::TextArray2D => f.write_str("TEXT[][]"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TsLexeme {
pub word: String,
pub positions: Vec<u16>,
pub weight: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TsQueryAst {
Term {
word: String,
weight_mask: u8,
},
And(Box<TsQueryAst>, Box<TsQueryAst>),
Or(Box<TsQueryAst>, Box<TsQueryAst>),
Not(Box<TsQueryAst>),
Phrase {
left: Box<TsQueryAst>,
right: Box<TsQueryAst>,
distance: u16,
},
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum Value {
SmallInt(i16),
Int(i32),
BigInt(i64),
Float(f64),
Text(String),
Bool(bool),
Vector(Vec<f32>),
Sq8Vector(crate::quantize::Sq8Vector),
HalfVector(crate::halfvec::HalfVector),
Numeric {
scaled: i128,
scale: u8,
},
Date(i32),
Timestamp(i64),
Interval {
months: i32,
micros: i64,
},
Json(String),
Bytes(Vec<u8>),
TextArray(Vec<Option<String>>),
IntArray(Vec<Option<i32>>),
BigIntArray(Vec<Option<i64>>),
TsVector(Vec<TsLexeme>),
TsQuery(TsQueryAst),
Uuid([u8; 16]),
Time(i64),
Year(u16),
TimeTz {
us: i64,
offset_secs: i32,
},
Money(i64),
Hstore(Vec<(String, Option<String>)>),
IntArray2D(Vec<Vec<Option<i32>>>),
BigIntArray2D(Vec<Vec<Option<i64>>>),
TextArray2D(Vec<Vec<Option<String>>>),
Range {
kind: RangeKind,
lower: Option<alloc::boxed::Box<Value>>,
upper: Option<alloc::boxed::Box<Value>>,
lower_inc: bool,
upper_inc: bool,
empty: bool,
},
Null,
}
impl Value {
pub fn data_type(&self) -> Option<DataType> {
match self {
Self::SmallInt(_) => Some(DataType::SmallInt),
Self::Int(_) => Some(DataType::Int),
Self::BigInt(_) => Some(DataType::BigInt),
Self::Float(_) => Some(DataType::Float),
Self::Text(_) => Some(DataType::Text),
Self::Bool(_) => Some(DataType::Bool),
Self::Vector(v) => Some(DataType::Vector {
dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F32,
}),
Self::Sq8Vector(q) => Some(DataType::Vector {
dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::Sq8,
}),
Self::HalfVector(h) => Some(DataType::Vector {
dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F16,
}),
Self::Numeric { scale, .. } => Some(DataType::Numeric {
precision: 0,
scale: *scale,
}),
Self::Date(_) => Some(DataType::Date),
Self::Timestamp(_) => Some(DataType::Timestamp),
Self::Interval { .. } => Some(DataType::Interval),
Self::Json(_) => Some(DataType::Json),
Self::Bytes(_) => Some(DataType::Bytes),
Self::TextArray(_) => Some(DataType::TextArray),
Self::IntArray(_) => Some(DataType::IntArray),
Self::BigIntArray(_) => Some(DataType::BigIntArray),
Self::TsVector(_) => Some(DataType::TsVector),
Self::TsQuery(_) => Some(DataType::TsQuery),
Self::Uuid(_) => Some(DataType::Uuid),
Self::Time(_) => Some(DataType::Time),
Self::Year(_) => Some(DataType::Year),
Self::TimeTz { .. } => Some(DataType::TimeTz),
Self::Money(_) => Some(DataType::Money),
Self::Range { kind, .. } => Some(DataType::Range(*kind)),
Self::Hstore(_) => Some(DataType::Hstore),
Self::IntArray2D(_) => Some(DataType::IntArray2D),
Self::BigIntArray2D(_) => Some(DataType::BigIntArray2D),
Self::TextArray2D(_) => Some(DataType::TextArray2D),
Self::Null => None,
}
}
pub const fn is_null(&self) -> bool {
matches!(self, Self::Null)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Row {
pub values: Vec<Value>,
}
impl Row {
pub const fn new(values: Vec<Value>) -> Self {
Self { values }
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub ty: DataType,
pub nullable: bool,
pub default: Option<Value>,
pub runtime_default: Option<String>,
pub auto_increment: bool,
pub user_enum_type: Option<String>,
pub user_domain_type: Option<String>,
pub on_update_runtime: Option<String>,
pub collation: Collation,
pub is_unsigned: bool,
pub inline_enum_variants: Option<Vec<String>>,
pub inline_set_variants: Option<Vec<String>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Collation {
Binary,
CaseInsensitive,
}
#[allow(clippy::derivable_impls)]
impl Default for Collation {
fn default() -> Self {
Self::Binary
}
}
impl Collation {
pub const TAG_BINARY: u8 = 0;
pub const TAG_CASE_INSENSITIVE: u8 = 1;
}
#[derive(Debug, Clone, PartialEq)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnSchema>,
pub hot_tier_bytes: Option<u64>,
pub foreign_keys: Vec<ForeignKeyConstraint>,
pub uniqueness_constraints: Vec<UniquenessConstraint>,
pub checks: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UniquenessConstraint {
pub is_primary_key: bool,
pub columns: Vec<usize>,
pub nulls_not_distinct: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForeignKeyConstraint {
pub name: Option<String>,
pub local_columns: Vec<usize>,
pub parent_table: String,
pub parent_columns: Vec<usize>,
pub on_delete: FkAction,
pub on_update: FkAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FkAction {
Restrict,
Cascade,
SetNull,
SetDefault,
NoAction,
}
impl FkAction {
pub const fn tag(self) -> u8 {
match self {
Self::Restrict => 0,
Self::Cascade => 1,
Self::SetNull => 2,
Self::SetDefault => 3,
Self::NoAction => 4,
}
}
pub const fn from_tag(b: u8) -> Option<Self> {
Some(match b {
0 => Self::Restrict,
1 => Self::Cascade,
2 => Self::SetNull,
3 => Self::SetDefault,
4 => Self::NoAction,
_ => return None,
})
}
}
impl TableSchema {
pub fn column_position(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c.name == name)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum IndexKey {
Int(i64),
Text(String),
Bool(bool),
Uuid([u8; 16]),
}
impl IndexKey {
pub fn from_value(v: &Value) -> Option<Self> {
match v {
Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
Value::Int(n) => Some(Self::Int(i64::from(*n))),
Value::BigInt(n) => Some(Self::Int(*n)),
Value::Text(s) => Some(Self::Text(s.clone())),
Value::Bool(b) => Some(Self::Bool(*b)),
Value::Date(d) => Some(Self::Int(i64::from(*d))),
Value::Timestamp(t) => Some(Self::Int(*t)),
Value::Uuid(b) => Some(Self::Uuid(*b)),
Value::Time(us) => Some(Self::Int(*us)),
Value::Year(y) => Some(Self::Int(i64::from(*y))),
Value::TimeTz { us, offset_secs } => {
Some(Self::Int(us - i64::from(*offset_secs) * 1_000_000))
}
Value::Money(c) => Some(Self::Int(*c)),
Value::Range { .. } => None,
Value::Hstore(_) => None,
Value::IntArray2D(_) | Value::BigIntArray2D(_) | Value::TextArray2D(_) => None,
Value::Null
| Value::Float(_)
| Value::Vector(_)
| Value::Sq8Vector(_)
| Value::HalfVector(_)
| Value::Numeric { .. }
| Value::Interval { .. }
| Value::Json(_)
| Value::Bytes(_)
| Value::TextArray(_)
| Value::IntArray(_)
| Value::BigIntArray(_)
| Value::TsVector(_)
| Value::TsQuery(_) => None,
}
}
}
#[derive(Debug, Clone)]
pub struct Index {
pub name: String,
pub column_position: usize,
pub kind: IndexKind,
pub included_columns: Vec<usize>,
pub partial_predicate: Option<String>,
pub expression: Option<String>,
pub is_unique: bool,
pub extra_column_positions: Vec<usize>,
}
pub const NSW_DEFAULT_M: usize = 16;
#[derive(Debug, Clone)]
pub struct FreezeReport {
pub segment_id: u32,
pub frozen_rows: usize,
pub bytes_freed: u64,
pub segment_bytes: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct FreezeSlice {
pub row_range: core::ops::Range<usize>,
pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
}
#[derive(Debug, Clone)]
pub struct CompactReport {
pub sources: Vec<u32>,
pub merged_segment_id: Option<u32>,
pub merged_segment_bytes: Vec<u8>,
pub merged_rows: usize,
pub deleted_rows_pruned: usize,
pub bytes_reclaimed_estimate: u64,
}
#[derive(Debug, Clone)]
pub enum IndexKind {
BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
Nsw(NswGraph),
Brin {
column_type: DataType,
},
Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
GinFulltext(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
}
#[derive(Debug, Clone)]
pub struct NswGraph {
pub m: usize,
pub m_max_0: usize,
pub entry: Option<usize>,
pub entry_level: u8,
pub levels: PersistentVec<u8>,
pub layers: Vec<PersistentVec<Vec<u32>>>,
}
impl NswGraph {
fn new(m: usize) -> Self {
Self {
m,
m_max_0: m.saturating_mul(2),
entry: None,
entry_level: 0,
levels: PersistentVec::new(),
layers: alloc::vec![PersistentVec::new()],
}
}
pub const fn cap_for_layer(&self, layer: u8) -> usize {
if layer == 0 { self.m_max_0 } else { self.m }
}
}
#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
x ^= x >> 30;
x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
x ^= x >> 27;
x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
x ^= x >> 31;
let mut level: u8 = 0;
while x & 0xF == 0 && level < MAX_LEVEL {
level += 1;
x >>= 4;
}
level
}
impl Index {
fn new_btree(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::BTree(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::Nsw(NswGraph::new(m)),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
Self {
name,
column_position,
kind: IndexKind::Brin { column_type },
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::Gin(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin_trgm(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin_fulltext(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::GinFulltext(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
match &self.kind {
IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => &[][..],
}
}
pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
match &self.kind {
IndexKind::Gin(m) | IndexKind::GinFulltext(m) => {
m.get(&String::from(word)).map_or(&[][..], Vec::as_slice)
}
IndexKind::BTree(_)
| IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::GinTrgm(_) => &[][..],
}
}
pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
match &self.kind {
IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
IndexKind::BTree(_)
| IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinFulltext(_) => &[][..],
}
}
pub const fn nsw(&self) -> Option<&NswGraph> {
match &self.kind {
IndexKind::Nsw(g) => Some(g),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => None,
}
}
pub const fn is_brin(&self) -> bool {
matches!(self.kind, IndexKind::Brin { .. })
}
pub const fn is_gin_trgm(&self) -> bool {
matches!(self.kind, IndexKind::GinTrgm(_))
}
pub const fn is_gin(&self) -> bool {
matches!(self.kind, IndexKind::Gin(_))
}
pub const fn is_gin_fulltext(&self) -> bool {
matches!(self.kind, IndexKind::GinFulltext(_))
}
}
#[derive(Debug, Clone)]
pub struct Table {
schema: TableSchema,
rows: PersistentVec<Row>,
indices: Vec<Index>,
hot_bytes: u64,
cold_row_count: u64,
cold_row_count_stale: bool,
}
impl Table {
pub fn new(schema: TableSchema) -> Self {
Self {
schema,
rows: PersistentVec::new(),
indices: Vec::new(),
hot_bytes: 0,
cold_row_count: 0,
cold_row_count_stale: false,
}
}
#[must_use]
pub const fn hot_bytes(&self) -> u64 {
self.hot_bytes
}
#[must_use]
pub const fn cold_row_count(&self) -> u64 {
self.cold_row_count
}
pub fn set_cold_row_count(&mut self, n: u64) {
self.cold_row_count = n;
self.cold_row_count_stale = false;
}
pub fn mark_cold_row_count_stale(&mut self) {
self.cold_row_count_stale = true;
}
#[must_use]
pub const fn cold_row_count_stale(&self) -> bool {
self.cold_row_count_stale
}
#[must_use]
pub fn count_cold_locators(&self) -> u64 {
let mut best: u64 = 0;
for idx in &self.indices {
if let IndexKind::BTree(map) = &idx.kind {
let n: u64 = map
.iter()
.map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
.sum();
if n > best {
best = n;
}
}
}
best
}
pub const fn schema(&self) -> &TableSchema {
&self.schema
}
pub const fn schema_mut(&mut self) -> &mut TableSchema {
&mut self.schema
}
pub const fn rows(&self) -> &PersistentVec<Row> {
&self.rows
}
pub const fn row_count(&self) -> usize {
self.rows.len()
}
pub fn indices_mut(&mut self) -> &mut [Index] {
&mut self.indices
}
pub fn indices(&self) -> &[Index] {
&self.indices
}
pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
let ty = self.schema.columns.get(col_pos)?.ty;
if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
return None;
}
let mut max: Option<i64> = None;
for row in &self.rows {
match row.values.get(col_pos) {
Some(Value::SmallInt(n)) => {
let v = i64::from(*n);
max = Some(max.map_or(v, |m| m.max(v)));
}
Some(Value::Int(n)) => {
let v = i64::from(*n);
max = Some(max.map_or(v, |m| m.max(v)));
}
Some(Value::BigInt(n)) => {
max = Some(max.map_or(*n, |m| m.max(*n)));
}
_ => {}
}
}
Some(max.map_or(1, |m| m + 1))
}
pub fn index_on(&self, column_position: usize) -> Option<&Index> {
self.indices
.iter()
.find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
.or_else(|| {
self.indices.iter().find(|i| {
i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
})
})
}
pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
if row.len() != self.schema.columns.len() {
return Err(StorageError::ArityMismatch {
expected: self.schema.columns.len(),
actual: row.len(),
});
}
for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
if val.is_null() {
if !col.nullable {
return Err(StorageError::NullInNotNull {
column: col.name.clone(),
});
}
continue;
}
let actual = val.data_type().expect("non-null");
let compatible = actual == col.ty
|| matches!(
(actual, col.ty),
(
DataType::Text,
DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
) | (DataType::Json | DataType::Jsonb, DataType::Text)
| (DataType::Json, DataType::Jsonb)
| (DataType::Jsonb, DataType::Json)
| (DataType::Timestamp, DataType::Timestamptz)
| (DataType::Timestamptz, DataType::Timestamp)
)
|| matches!(
(actual, col.ty),
(
DataType::Numeric { scale: a, .. },
DataType::Numeric { scale: b, .. },
) if a == b
);
if !compatible {
return Err(StorageError::TypeMismatch {
column: col.name.clone(),
expected: col.ty,
actual,
position: i,
});
}
}
let new_row_idx = self.rows.len();
for idx in &mut self.indices {
match &mut idx.kind {
IndexKind::BTree(map) => {
if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(new_row_idx));
map.insert_mut(key, entries);
}
}
IndexKind::Gin(map) => {
if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
for lex in lexemes {
let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(new_row_idx));
map.insert_mut(lex.word.clone(), entries);
}
}
}
IndexKind::GinTrgm(map) => {
if let Value::Text(s) = &row.values[idx.column_position] {
for tri in trgm::extract_trigrams(s) {
let mut entries = map.get(&tri).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(new_row_idx));
map.insert_mut(tri, entries);
}
}
}
IndexKind::GinFulltext(map) => {
let text_cell = match &row.values[idx.column_position] {
Value::Text(s) => Some(s.as_str()),
_ => None,
};
if let Some(s) = text_cell {
for lex in fts_simple::simple_lex(s) {
let mut entries = map.get(&lex).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(new_row_idx));
map.insert_mut(lex, entries);
}
}
}
IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
}
}
self.hot_bytes = self
.hot_bytes
.saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
self.rows.push_mut(row);
let new_row_idx = self.rows.len() - 1;
let nsw_targets: Vec<usize> = self
.indices
.iter()
.enumerate()
.filter_map(|(i, idx)| {
if matches!(idx.kind, IndexKind::Nsw(_)) {
Some(i)
} else {
None
}
})
.collect();
for idx_pos in nsw_targets {
nsw_insert_at(self, idx_pos, new_row_idx);
}
Ok(())
}
pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let mut idx = Index::new_btree(name, column_position);
if let IndexKind::BTree(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(key, entries);
}
}
}
self.indices.push(idx);
Ok(())
}
pub fn add_nsw_index(
&mut self,
name: String,
column_name: &str,
m: usize,
) -> Result<(), StorageError> {
self.add_nsw_index_inner(name, column_name, m, None)
}
pub fn rebuild_nsw_index(
&mut self,
name: &str,
new_encoding: Option<VecEncoding>,
) -> Result<(), StorageError> {
let idx_pos = self
.indices
.iter()
.position(|i| i.name == name)
.ok_or_else(|| StorageError::IndexNotFound {
name: String::from(name),
})?;
let col_pos = self.indices[idx_pos].column_position;
let m = match &self.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.m,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
)));
}
};
let col_name = self.schema.columns[col_pos].name.clone();
if let Some(target) = new_encoding {
let current = match self.schema.columns[col_pos].ty {
DataType::Vector { encoding, .. } => encoding,
ref other => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
)));
}
};
if target != current {
let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
unreachable!("checked above")
};
let n = self.rows.len();
for i in 0..n {
let row = self
.rows
.get_mut(i)
.expect("row index in bounds (we iterated up to len())");
let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
let recoded = recode_vector_cell(cell, target)?;
row.values[col_pos] = recoded;
}
self.schema.columns[col_pos].ty = DataType::Vector {
dim,
encoding: target,
};
}
}
self.indices.remove(idx_pos);
self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
Ok(())
}
pub fn restore_nsw_index(
&mut self,
name: String,
column_name: &str,
graph: NswGraph,
) -> Result<(), StorageError> {
self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
}
pub fn restore_btree_index(
&mut self,
name: String,
column_name: &str,
map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
self.indices.push(Index {
name,
column_position,
kind: IndexKind::BTree(map),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
});
Ok(())
}
pub fn restore_brin_index(
&mut self,
name: String,
column_name: &str,
column_type: DataType,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
self.indices
.push(Index::new_brin(name, column_position, column_type));
Ok(())
}
pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let column_type = self.schema.columns[column_position].ty;
self.indices
.push(Index::new_brin(name, column_position, column_type));
Ok(())
}
pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
if self.schema.columns[column_position].ty != DataType::TsVector {
return Err(StorageError::Corrupt(format!(
"GIN index {name:?} requires a tsvector column; \
{column_name:?} is {:?}",
self.schema.columns[column_position].ty
)));
}
let mut idx = Index::new_gin(name, column_position);
if let IndexKind::Gin(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::TsVector(lexemes) = &row.values[column_position] {
for lex in lexemes {
let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(lex.word.clone(), entries);
}
}
}
}
self.indices.push(idx);
Ok(())
}
pub fn restore_gin_index(
&mut self,
name: String,
column_name: &str,
map: PersistentBTreeMap<String, Vec<RowLocator>>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let mut idx = Index::new_gin(name, column_position);
idx.kind = IndexKind::Gin(map);
self.indices.push(idx);
Ok(())
}
pub fn add_gin_trgm_index(
&mut self,
name: String,
column_name: &str,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
if !matches!(
self.schema.columns[column_position].ty,
DataType::Text | DataType::Varchar(_)
) {
return Err(StorageError::Corrupt(format!(
"trigram-GIN index {name:?} requires a TEXT/VARCHAR column; \
{column_name:?} is {:?}",
self.schema.columns[column_position].ty
)));
}
let mut idx = Index::new_gin_trgm(name, column_position);
if let IndexKind::GinTrgm(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::Text(s) = &row.values[column_position] {
for tri in trgm::extract_trigrams(s) {
let mut entries = map.get(&tri).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(tri, entries);
}
}
}
}
self.indices.push(idx);
Ok(())
}
pub fn restore_gin_trgm_index(
&mut self,
name: String,
column_name: &str,
map: PersistentBTreeMap<String, Vec<RowLocator>>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let mut idx = Index::new_gin_trgm(name, column_position);
idx.kind = IndexKind::GinTrgm(map);
self.indices.push(idx);
Ok(())
}
pub fn add_gin_fulltext_index(
&mut self,
name: String,
column_name: &str,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
if !matches!(
self.schema.columns[column_position].ty,
DataType::Text | DataType::Varchar(_)
) {
return Err(StorageError::Corrupt(format!(
"fulltext-GIN index {name:?} requires a TEXT/VARCHAR column; \
{column_name:?} is {:?}",
self.schema.columns[column_position].ty
)));
}
let mut idx = Index::new_gin_fulltext(name, column_position);
if let IndexKind::GinFulltext(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::Text(s) = &row.values[column_position] {
for lex in fts_simple::simple_lex(s) {
let mut entries = map.get(&lex).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(lex, entries);
}
}
}
}
self.indices.push(idx);
Ok(())
}
pub fn restore_gin_fulltext_index(
&mut self,
name: String,
column_name: &str,
map: PersistentBTreeMap<String, Vec<RowLocator>>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
let mut idx = Index::new_gin_fulltext(name, column_position);
idx.kind = IndexKind::GinFulltext(map);
self.indices.push(idx);
Ok(())
}
pub fn register_cold_locators<I>(
&mut self,
index_name: &str,
locators: I,
) -> Result<usize, StorageError>
where
I: IntoIterator<Item = (IndexKey, RowLocator)>,
{
let idx = self
.indices
.iter_mut()
.find(|i| i.name == index_name)
.ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
let map = match &mut idx.kind {
IndexKind::BTree(map) => map,
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return Err(StorageError::Corrupt(format!(
"index {index_name:?} is not BTree; cold locators apply only to BTree indices"
)));
}
};
let mut count = 0usize;
for (key, locator) in locators {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(locator);
map.insert_mut(key, entries);
count += 1;
}
Ok(count)
}
pub fn register_gin_cold_locators<I>(
&mut self,
index_name: &str,
locators: I,
) -> Result<usize, StorageError>
where
I: IntoIterator<Item = (String, RowLocator)>,
{
let idx = self
.indices
.iter_mut()
.find(|i| i.name == index_name)
.ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
let map = match &mut idx.kind {
IndexKind::Gin(map) | IndexKind::GinTrgm(map) | IndexKind::GinFulltext(map) => map,
IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
return Err(StorageError::Corrupt(format!(
"register_gin_cold_locators: index {index_name:?} is not GIN"
)));
}
};
let mut count = 0usize;
for (word, locator) in locators {
let mut entries = map.get(&word).cloned().unwrap_or_default();
entries.push(locator);
map.insert_mut(word, entries);
count += 1;
}
Ok(count)
}
pub fn remove_cold_locators_for_key(
&mut self,
index_name: &str,
key: &IndexKey,
) -> Result<usize, StorageError> {
let idx = self
.indices
.iter_mut()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"remove_cold_locators_for_key: index {index_name:?} not found"
))
})?;
let map = match &mut idx.kind {
IndexKind::BTree(map) => map,
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return Err(StorageError::Corrupt(format!(
"remove_cold_locators_for_key: index {index_name:?} is not BTree; \
cold locators apply only to BTree indices"
)));
}
};
let Some(entries) = map.get(key) else {
return Ok(0);
};
let mut kept: Vec<RowLocator> =
entries.iter().copied().filter(RowLocator::is_hot).collect();
let removed = entries.len() - kept.len();
if removed == 0 {
return Ok(0);
}
kept.shrink_to_fit();
map.insert_mut(key.clone(), kept);
Ok(removed)
}
pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
self.schema.columns.push(col);
let mut new_rows: PersistentVec<Row> = PersistentVec::new();
for row in self.rows.iter() {
let mut values = row.values.clone();
values.push(fill_value.clone());
new_rows.push_mut(Row::new(values));
}
self.rows = new_rows;
}
pub fn set_partial_predicate(&mut self, idx: usize, pred: Option<String>) {
debug_assert!(idx < self.indices.len());
self.indices[idx].partial_predicate = pred;
}
pub fn rename_column(&mut self, col_pos: usize, new_name: &str) {
debug_assert!(col_pos < self.schema.columns.len());
self.schema.columns[col_pos].name = new_name.to_string();
}
pub fn drop_column(&mut self, col_pos: usize) {
debug_assert!(col_pos < self.schema.columns.len());
self.schema.columns.remove(col_pos);
let mut new_rows: PersistentVec<Row> = PersistentVec::new();
for row in self.rows.iter() {
let mut values = row.values.clone();
if col_pos < values.len() {
values.remove(col_pos);
}
new_rows.push_mut(Row::new(values));
}
self.rows = new_rows;
self.indices.retain(|idx| idx.column_position != col_pos);
for idx in &mut self.indices {
if idx.column_position > col_pos {
idx.column_position -= 1;
}
for inc in &mut idx.included_columns {
if *inc > col_pos {
*inc -= 1;
}
}
}
let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
uc.columns.retain(|&c| c != col_pos);
if uc.columns.is_empty() {
continue;
}
for c in &mut uc.columns {
if *c > col_pos {
*c -= 1;
}
}
surviving_ucs.push(uc);
}
self.schema.uniqueness_constraints = surviving_ucs;
for fk in &mut self.schema.foreign_keys {
for c in &mut fk.local_columns {
if *c > col_pos {
*c -= 1;
}
}
}
self.rebuild_indices();
}
pub fn truncate(&mut self) {
self.rows = PersistentVec::new();
self.hot_bytes = 0;
self.rebuild_indices();
}
pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
if positions.is_empty() {
return 0;
}
let mut to_remove = alloc::vec![false; self.rows.len()];
let mut removed = 0;
for &p in positions {
if p < to_remove.len() && !to_remove[p] {
to_remove[p] = true;
removed += 1;
}
}
let mut new_rows: PersistentVec<Row> = PersistentVec::new();
let mut removed_bytes: u64 = 0;
for (i, row) in self.rows.iter().enumerate() {
if to_remove[i] {
removed_bytes =
removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
} else {
new_rows.push_mut(row.clone());
}
}
self.rows = new_rows;
self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
self.rebuild_indices();
removed
}
pub fn update_row(
&mut self,
position: usize,
new_values: Vec<Value>,
) -> Result<(), StorageError> {
if position >= self.rows.len() {
return Err(StorageError::Corrupt(alloc::format!(
"update_row: position {position} out of bounds (rows={})",
self.rows.len()
)));
}
if new_values.len() != self.schema.columns.len() {
return Err(StorageError::ArityMismatch {
expected: self.schema.columns.len(),
actual: new_values.len(),
});
}
for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
if val.is_null() {
if !col.nullable {
return Err(StorageError::NullInNotNull {
column: col.name.clone(),
});
}
continue;
}
let actual = val.data_type().expect("non-null");
let compatible = actual == col.ty
|| matches!(
(actual, col.ty),
(
DataType::Text,
DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
) | (DataType::Json | DataType::Jsonb, DataType::Text)
| (DataType::Json, DataType::Jsonb)
| (DataType::Jsonb, DataType::Json)
| (DataType::Timestamp, DataType::Timestamptz)
| (DataType::Timestamptz, DataType::Timestamp)
)
|| matches!(
(actual, col.ty),
(
DataType::Numeric { scale: a, .. },
DataType::Numeric { scale: b, .. },
) if a == b
);
if !compatible {
return Err(StorageError::TypeMismatch {
column: col.name.clone(),
expected: col.ty,
actual,
position: i,
});
}
}
let old_row = self
.rows
.get(position)
.expect("position bounds-checked above");
let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
let new_row = Row::new(new_values);
let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
enum IdxFix {
BTreeMove {
idx_pos: usize,
old_key: Option<IndexKey>,
new_key: Option<IndexKey>,
},
FullRebuild,
}
let mut fixes: Vec<IdxFix> = Vec::new();
for (idx_pos, idx) in self.indices.iter().enumerate() {
let col = idx.column_position;
let old_v = &old_row.values[col];
let new_v = &new_row.values[col];
if old_v == new_v {
continue;
}
match &idx.kind {
IndexKind::BTree(_) => fixes.push(IdxFix::BTreeMove {
idx_pos,
old_key: IndexKey::from_value(old_v),
new_key: IndexKey::from_value(new_v),
}),
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
fixes.clear();
fixes.push(IdxFix::FullRebuild);
break;
}
}
}
self.rows = self
.rows
.set(position, new_row)
.expect("position bounds-checked above");
self.hot_bytes = self
.hot_bytes
.saturating_sub(old_bytes)
.saturating_add(new_bytes);
for fix in fixes {
match fix {
IdxFix::FullRebuild => {
self.rebuild_indices();
break;
}
IdxFix::BTreeMove {
idx_pos,
old_key,
new_key,
} => {
let IndexKind::BTree(map) = &mut self.indices[idx_pos].kind else {
unreachable!("IdxFix::BTreeMove built from a BTree index");
};
if let Some(k) = old_key
&& let Some(locs) = map.get(&k)
{
let mut locs = locs.clone();
locs.retain(|l| *l != RowLocator::Hot(position));
map.insert_mut(k, locs);
}
if let Some(k) = new_key {
let mut entries = map.get(&k).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(position));
map.insert_mut(k, entries);
}
}
}
}
Ok(())
}
fn rebuild_indices(&mut self) {
let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
.indices
.iter()
.filter_map(|idx| match &idx.kind {
IndexKind::BTree(map) => {
let cold: Vec<(IndexKey, RowLocator)> = map
.iter()
.flat_map(|(k, locs)| {
locs.iter()
.filter(|l| l.is_cold())
.copied()
.map(move |l| (k.clone(), l))
})
.collect();
if cold.is_empty() {
None
} else {
Some((idx.name.clone(), cold))
}
}
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => None,
})
.collect();
let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
.indices
.iter()
.filter_map(|idx| match &idx.kind {
IndexKind::Gin(map) | IndexKind::GinTrgm(map) | IndexKind::GinFulltext(map) => {
let cold: Vec<(String, RowLocator)> = map
.iter()
.flat_map(|(w, locs)| {
locs.iter()
.filter(|l| l.is_cold())
.copied()
.map(move |l| (w.clone(), l))
})
.collect();
if cold.is_empty() {
None
} else {
Some((idx.name.clone(), cold))
}
}
IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
})
.collect();
#[derive(Clone)]
enum RebuildKind {
BTree,
Nsw(usize),
Brin(DataType),
Gin,
GinTrgm,
GinFulltext,
}
let descriptors: Vec<(String, usize, RebuildKind)> = self
.indices
.iter()
.map(|idx| {
let kind = match &idx.kind {
IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
IndexKind::BTree(_) => RebuildKind::BTree,
IndexKind::Gin(_) => RebuildKind::Gin,
IndexKind::GinTrgm(_) => RebuildKind::GinTrgm,
IndexKind::GinFulltext(_) => RebuildKind::GinFulltext,
};
(idx.name.clone(), idx.column_position, kind)
})
.collect();
self.indices.clear();
for (name, column_position, rebuild_kind) in descriptors {
match rebuild_kind {
RebuildKind::Nsw(m) => {
let idx = Index::new_nsw(name, column_position, m);
self.indices.push(idx);
let idx_pos = self.indices.len() - 1;
let row_indices: Vec<usize> = (0..self.rows.len()).collect();
for row_idx in row_indices {
nsw_insert_at(self, idx_pos, row_idx);
}
}
RebuildKind::Brin(column_type) => {
self.indices
.push(Index::new_brin(name, column_position, column_type));
}
RebuildKind::BTree => {
let mut idx = Index::new_btree(name, column_position);
if let IndexKind::BTree(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
let mut entries = map.get(&key).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(key, entries);
}
}
}
self.indices.push(idx);
}
RebuildKind::Gin => {
let mut idx = Index::new_gin(name, column_position);
if let IndexKind::Gin(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::TsVector(lexemes) = &row.values[column_position] {
for lex in lexemes {
let mut entries =
map.get(&lex.word).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(lex.word.clone(), entries);
}
}
}
}
self.indices.push(idx);
}
RebuildKind::GinTrgm => {
let mut idx = Index::new_gin_trgm(name, column_position);
if let IndexKind::GinTrgm(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::Text(s) = &row.values[column_position] {
for tri in trgm::extract_trigrams(s) {
let mut entries = map.get(&tri).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(tri, entries);
}
}
}
}
self.indices.push(idx);
}
RebuildKind::GinFulltext => {
let mut idx = Index::new_gin_fulltext(name, column_position);
if let IndexKind::GinFulltext(map) = &mut idx.kind {
for (i, row) in self.rows.iter().enumerate() {
if let Value::Text(s) = &row.values[column_position] {
for lex in fts_simple::simple_lex(s) {
let mut entries = map.get(&lex).cloned().unwrap_or_default();
entries.push(RowLocator::Hot(i));
map.insert_mut(lex, entries);
}
}
}
}
self.indices.push(idx);
}
}
}
for (idx_name, locators) in preserved_cold {
let _ = self.register_cold_locators(&idx_name, locators);
}
for (idx_name, locators) in preserved_gin_cold {
let _ = self.register_gin_cold_locators(&idx_name, locators);
}
}
fn add_nsw_index_inner(
&mut self,
name: String,
column_name: &str,
m: usize,
restore: Option<NswGraph>,
) -> Result<(), StorageError> {
if self.indices.iter().any(|i| i.name == name) {
return Err(StorageError::DuplicateIndex { name });
}
let column_position = self.schema.column_position(column_name).ok_or_else(|| {
StorageError::ColumnNotFound {
column: column_name.into(),
}
})?;
if !matches!(
self.schema.columns[column_position].ty,
DataType::Vector { .. }
) {
return Err(StorageError::TypeMismatch {
column: column_name.into(),
expected: DataType::Vector {
dim: 0,
encoding: VecEncoding::F32,
},
actual: self.schema.columns[column_position].ty,
position: column_position,
});
}
if let Some(graph) = restore {
self.indices.push(Index {
name,
column_position,
kind: IndexKind::Nsw(graph),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
});
return Ok(());
}
let idx = Index::new_nsw(name, column_position, m);
self.indices.push(idx);
let idx_pos = self.indices.len() - 1;
let row_indices: Vec<usize> = (0..self.rows.len()).collect();
for row_idx in row_indices {
nsw_insert_at(self, idx_pos, row_idx);
}
Ok(())
}
}
fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
if matches!(cell, Value::Null) {
return Ok(cell);
}
let as_f32: Vec<f32> = match &cell {
Value::Vector(v) => v.clone(),
Value::Sq8Vector(q) => quantize::dequantize(q),
Value::HalfVector(h) => h.to_f32_vec(),
other => {
return Err(StorageError::Unsupported(format!(
"ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
other.data_type()
)));
}
};
Ok(match target {
VecEncoding::F32 => Value::Vector(as_f32),
VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
})
}
fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
let col_pos = table.indices[idx_pos].column_position;
let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
Value::Vector(v) => Some(v.len()),
Value::Sq8Vector(q) => Some(q.bytes.len()),
Value::HalfVector(h) => Some(h.dim()),
_ => None,
};
let Some(dim) = cell_dim else {
ensure_node_slot(table, idx_pos, new_row_idx, 0);
return;
};
if dim == 0 {
ensure_node_slot(table, idx_pos, new_row_idx, 0);
return;
}
let level = nsw_assign_level(new_row_idx);
ensure_node_slot(table, idx_pos, new_row_idx, level);
let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
unreachable!("nsw_insert_at on a non-NSW index")
}
};
if entry.is_none() {
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
g.entry = Some(new_row_idx);
g.entry_level = level;
*g.levels
.get_mut(new_row_idx)
.expect("levels slot padded by ensure_node_slot") = level;
}
return;
}
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
*g.levels
.get_mut(new_row_idx)
.expect("levels slot padded by ensure_node_slot") = level;
}
let query = match &table.rows[new_row_idx].values[col_pos] {
Value::Vector(v) => v.clone(),
Value::Sq8Vector(q) => quantize::dequantize(q),
Value::HalfVector(h) => h.to_f32_vec(),
_ => return,
};
let mut current = entry.expect("entry was Some above");
let mut current_d = vec_l2_sq(table, col_pos, current, &query);
if entry_level > level {
for layer in (level + 1..=entry_level).rev() {
(current, current_d) =
greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
}
}
let top = level.min(entry_level);
let ef = (m * 2).max(8);
for layer in (0..=top).rev() {
let cap = if layer == 0 { m * 2 } else { m };
let mut candidates = layer_beam_search(
table,
idx_pos,
layer,
current,
current_d,
&query,
ef,
NswMetric::L2,
);
candidates.retain(|&(_, n)| n != new_row_idx);
if let Some(&(d, n)) = candidates.first() {
current = n;
current_d = d;
}
let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
}
if level > entry_level
&& let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
{
g.entry = Some(new_row_idx);
g.entry_level = level;
}
}
fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
unreachable!("ensure_node_slot on a BTree index");
};
while g.layers.len() <= level as usize {
g.layers.push(PersistentVec::new());
}
while g.levels.len() <= new_row_idx {
g.levels.push_mut(0);
}
for layer_vec in &mut g.layers {
while layer_vec.len() <= new_row_idx {
layer_vec.push_mut(Vec::new());
}
}
}
fn greedy_layer_walk(
table: &Table,
idx_pos: usize,
layer: u8,
mut current: usize,
mut current_d: f32,
query: &[f32],
) -> (usize, f32) {
let g = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return (current, current_d);
}
};
let col_pos = table.indices[idx_pos].column_position;
loop {
let neighbours: &[u32] = g
.layers
.get(layer as usize)
.and_then(|layer_v| layer_v.get(current))
.map_or(&[][..], Vec::as_slice);
let mut best = current;
let mut best_d = current_d;
for &n in neighbours {
let n = n as usize;
let d = vec_l2_sq(table, col_pos, n, query);
if d < best_d {
best = n;
best_d = d;
}
}
if best == current {
return (current, current_d);
}
current = best;
current_d = best_d;
}
}
#[allow(clippy::too_many_arguments)] fn layer_beam_search(
table: &Table,
idx_pos: usize,
layer: u8,
entry_node: usize,
entry_d: f32,
query: &[f32],
ef: usize,
metric: NswMetric,
) -> Vec<(f32, usize)> {
let g = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => return Vec::new(),
};
let col_pos = table.indices[idx_pos].column_position;
let d0 = if matches!(metric, NswMetric::L2) {
entry_d
} else {
cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
};
let row_count = table.rows.len();
let mut visited: Vec<bool> = alloc::vec![false; row_count];
if entry_node < row_count {
visited[entry_node] = true;
}
let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
alloc::collections::BinaryHeap::with_capacity(ef);
let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
alloc::collections::BinaryHeap::with_capacity(ef);
candidates.push(NodeClosest {
dist: d0,
node: entry_node,
});
results.push(NodeFurthest {
dist: d0,
node: entry_node,
});
while let Some(cur) = candidates.pop() {
let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
if cur.dist > worst && results.len() >= ef {
break;
}
let neighbours: &[u32] = g
.layers
.get(layer as usize)
.and_then(|layer_v| layer_v.get(cur.node))
.map_or(&[][..], Vec::as_slice);
for &n in neighbours {
let n = n as usize;
if n >= row_count || visited[n] {
continue;
}
visited[n] = true;
let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
if !dn.is_finite() {
continue;
}
let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
if results.len() < ef || dn < worst {
results.push(NodeFurthest { dist: dn, node: n });
if results.len() > ef {
results.pop();
}
candidates.push(NodeClosest { dist: dn, node: n });
}
}
}
let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
out
}
#[derive(Debug, Clone, Copy)]
struct NodeClosest {
dist: f32,
node: usize,
}
impl PartialEq for NodeClosest {
fn eq(&self, other: &Self) -> bool {
self.dist == other.dist && self.node == other.node
}
}
impl Eq for NodeClosest {}
impl PartialOrd for NodeClosest {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for NodeClosest {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
other
.dist
.partial_cmp(&self.dist)
.unwrap_or(core::cmp::Ordering::Equal)
}
}
#[derive(Debug, Clone, Copy)]
struct NodeFurthest {
dist: f32,
node: usize,
}
impl PartialEq for NodeFurthest {
fn eq(&self, other: &Self) -> bool {
self.dist == other.dist && self.node == other.node
}
}
impl Eq for NodeFurthest {}
impl PartialOrd for NodeFurthest {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for NodeFurthest {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
self.dist
.partial_cmp(&other.dist)
.unwrap_or(core::cmp::Ordering::Equal)
}
}
fn select_neighbours_heuristic(
candidates: &[(f32, usize)],
m: usize,
table: &Table,
col_pos: usize,
) -> Vec<usize> {
let mut chosen: Vec<usize> = Vec::with_capacity(m);
for &(d_q, e) in candidates {
if chosen.len() >= m {
break;
}
if !matches!(
table.rows.get(e).and_then(|r| r.values.get(col_pos)),
Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
) {
continue;
}
let mut covered = false;
for &r in &chosen {
if cell_l2_sq(table, col_pos, e, r) < d_q {
covered = true;
break;
}
}
if !covered {
chosen.push(e);
}
}
chosen
}
fn connect_at_layer(
table: &mut Table,
idx_pos: usize,
layer: u8,
new_row_idx: usize,
peers: &[usize],
) {
let col_pos = table.indices[idx_pos].column_position;
let cap = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.cap_for_layer(layer),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => return,
};
let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
let layer_v = &mut g.layers[layer as usize];
if let Some(slot) = layer_v.get_mut(new_row_idx) {
*slot = peers
.iter()
.map(|&p| u32::try_from(p).expect("row index fits in u32"))
.collect();
}
}
for &peer in peers {
if !matches!(
&table.rows[peer].values[col_pos],
Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
) {
continue;
}
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
let layer_v = &mut g.layers[layer as usize];
if let Some(slot) = layer_v.get_mut(peer)
&& !slot.contains(&new_row_u32)
{
slot.push(new_row_u32);
}
}
let needs_trim = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => false,
};
if needs_trim {
let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => g.layers[layer as usize][peer]
.iter()
.map(|&n| n as usize)
.collect(),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => continue,
};
let mut tagged: Vec<(f32, usize)> = current_peers
.iter()
.map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
.collect();
tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
&& let Some(slot) = g.layers[layer as usize].get_mut(peer)
{
*slot = kept
.into_iter()
.map(|p| u32::try_from(p).expect("row index fits in u32"))
.collect();
}
}
}
}
fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
quantize::sq8_l2_distance_sq_asymmetric(q, query)
}
Some(Value::HalfVector(h)) if h.dim() == query.len() => {
halfvec::half_l2_distance_sq_asymmetric(h, query)
}
_ => f32::INFINITY,
}
}
fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
return f32::INFINITY;
};
let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
return f32::INFINITY;
};
match (cell_a, cell_b) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
(Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
quantize::sq8_l2_distance_sq(a, b)
}
(Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
halfvec::half_l2_distance_sq(a, b)
}
_ => f32::INFINITY,
}
}
fn cell_to_query_metric_distance(
table: &Table,
col_pos: usize,
row: usize,
query: &[f32],
metric: NswMetric,
) -> f32 {
match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
},
Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
},
_ => f32::INFINITY,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NswMetric {
L2,
InnerProduct,
Cosine,
}
fn nsw_search(
table: &Table,
idx_pos: usize,
query: &[f32],
k: usize,
ef: usize,
metric: NswMetric,
) -> Vec<(f32, usize)> {
let (entry, entry_level) = match &table.indices[idx_pos].kind {
IndexKind::Nsw(g) => (g.entry, g.entry_level),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => return Vec::new(),
};
let Some(entry) = entry else {
return Vec::new();
};
let col_pos = table.indices[idx_pos].column_position;
let sq8 = matches!(
table.schema.columns.get(col_pos).map(|c| c.ty),
Some(DataType::Vector {
encoding: VecEncoding::Sq8,
..
})
);
let ef = if sq8 {
ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
} else {
ef.max(k)
};
let entry_d = vec_l2_sq(table, col_pos, entry, query);
let mut current = entry;
let mut current_d = entry_d;
for layer in (1..=entry_level).rev() {
(current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
}
let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
if sq8 {
results = sq8_rerank(table, col_pos, &results, query, metric);
}
results.truncate(k);
results
}
fn sq8_rerank(
table: &Table,
col_pos: usize,
candidates: &[(f32, usize)],
query: &[f32],
metric: NswMetric,
) -> Vec<(f32, usize)> {
let mut out: Vec<(f32, usize)> = candidates
.iter()
.filter_map(|&(adc_d, row)| {
let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
let Value::Sq8Vector(q) = cell else {
return Some((adc_d, row));
};
let deq = quantize::dequantize(q);
if deq.len() != query.len() {
return None;
}
Some((metric_distance(metric, &deq, query), row))
})
.collect();
out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
out
}
const SQ8_RERANK_OVER_FETCH: usize = 3;
fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
match metric {
NswMetric::L2 => l2_distance_sq(a, b),
NswMetric::InnerProduct => -inner_product_f32(a, b),
NswMetric::Cosine => {
let (dot, na, nb) = cosine_dot_norms_f32(a, b);
if na == 0.0 || nb == 0.0 {
return f32::INFINITY;
}
let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
1.0 - dot / denom
}
}
}
#[doc(hidden)]
#[inline]
pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { inner_product_neon(a, b) };
}
}
inner_product_scalar(a, b)
}
fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
let mut dot: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
dot += x * y;
}
dot
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
use core::arch::aarch64::{
float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc0 = zero;
let mut acc1 = zero;
let n = a.len();
let mut i = 0usize;
while i + 8 <= n {
let av0 = vld1q_f32(a.as_ptr().add(i));
let bv0 = vld1q_f32(b.as_ptr().add(i));
acc0 = vfmaq_f32(acc0, av0, bv0);
let av1 = vld1q_f32(a.as_ptr().add(i + 4));
let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
acc1 = vfmaq_f32(acc1, av1, bv1);
i += 8;
}
while i + 4 <= n {
let av = vld1q_f32(a.as_ptr().add(i));
let bv = vld1q_f32(b.as_ptr().add(i));
acc0 = vfmaq_f32(acc0, av, bv);
i += 4;
}
vaddvq_f32(vaddq_f32(acc0, acc1))
}
}
#[doc(hidden)]
#[inline]
pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { cosine_dot_norms_neon(a, b) };
}
}
cosine_dot_norms_scalar(a, b)
}
fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
let mut dot: f32 = 0.0;
let mut na: f32 = 0.0;
let mut nb: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
dot += x * y;
na += x * x;
nb += y * y;
}
(dot, na, nb)
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names, clippy::similar_names)]
unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc_dot = zero;
let mut acc_na = zero;
let mut acc_nb = zero;
let n = a.len();
let mut i = 0usize;
while i + 4 <= n {
let av = vld1q_f32(a.as_ptr().add(i));
let bv = vld1q_f32(b.as_ptr().add(i));
acc_dot = vfmaq_f32(acc_dot, av, bv);
acc_na = vfmaq_f32(acc_na, av, av);
acc_nb = vfmaq_f32(acc_nb, bv, bv);
i += 4;
}
(vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
}
}
fn sqrt_newton_f32(x: f32) -> f32 {
if x <= 0.0 {
return 0.0;
}
let mut g = x;
for _ in 0..10 {
g = 0.5 * (g + x / g);
}
g
}
#[inline]
fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
#[cfg(target_arch = "aarch64")]
{
if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
return unsafe { l2_distance_sq_neon(a, b) };
}
}
l2_distance_sq_scalar(a, b)
}
fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
let mut sum: f32 = 0.0;
for (x, y) in a.iter().zip(b.iter()) {
let d = *x - *y;
sum += d * d;
}
sum
}
#[cfg(target_arch = "aarch64")]
#[target_feature(enable = "neon")]
#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
use core::arch::aarch64::{
float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
};
unsafe {
let zero: float32x4_t = vdupq_n_f32(0.0);
let mut acc0 = zero;
let mut acc1 = zero;
let n = a.len();
let mut i = 0usize;
while i + 8 <= n {
let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
acc0 = vfmaq_f32(acc0, d0, d0);
let d1 = vsubq_f32(
vld1q_f32(a.as_ptr().add(i + 4)),
vld1q_f32(b.as_ptr().add(i + 4)),
);
acc1 = vfmaq_f32(acc1, d1, d1);
i += 8;
}
while i + 4 <= n {
let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
acc0 = vfmaq_f32(acc0, d, d);
i += 4;
}
vaddvq_f32(vaddq_f32(acc0, acc1))
}
}
pub fn nsw_query(
table: &Table,
idx_name: &str,
query: &[f32],
k: usize,
metric: NswMetric,
) -> Vec<usize> {
let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
return Vec::new();
};
let ef = (k * 2).max(NSW_DEFAULT_M);
let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
hits.truncate(k);
hits.into_iter().map(|(_, idx)| idx).collect()
}
pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
table
.indices
.iter()
.find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
}
#[derive(Debug, Clone, Default)]
pub struct Catalog {
tables: Vec<Table>,
by_name: BTreeMap<String, usize>,
cold_segments: Vec<Option<Arc<OwnedSegment>>>,
functions: BTreeMap<String, FunctionDef>,
triggers: Vec<TriggerDef>,
sequences: BTreeMap<String, SequenceDef>,
views: BTreeMap<String, ViewDef>,
materialized_views: BTreeMap<String, String>,
enum_types: BTreeMap<String, EnumDef>,
domain_types: BTreeMap<String, DomainDef>,
schemas: alloc::collections::BTreeSet<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FunctionDef {
pub name: String,
pub args_repr: String,
pub returns: String,
pub language: String,
pub body: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TriggerDef {
pub name: String,
pub table: String,
pub timing: String,
pub events: Vec<String>,
pub for_each: String,
pub function: String,
pub update_columns: Vec<String>,
pub enabled: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SequenceDef {
pub name: String,
pub data_type: SequenceDataType,
pub start: i64,
pub increment: i64,
pub min_value: i64,
pub max_value: i64,
pub cache: i64,
pub cycle: bool,
pub owned_by: Option<(String, String)>,
pub last_value: i64,
pub is_called: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SequenceDataType {
SmallInt,
Int,
BigInt,
}
#[must_use]
pub fn is_builtin_schema(name: &str) -> bool {
name.eq_ignore_ascii_case("public")
|| name.eq_ignore_ascii_case("pg_catalog")
|| name.eq_ignore_ascii_case("information_schema")
}
#[must_use]
pub fn parse_uuid_str(input: &str) -> Option<[u8; 16]> {
let s = input.trim();
let s = if let Some(inner) = s.strip_prefix('{').and_then(|x| x.strip_suffix('}')) {
inner
} else {
s
};
let hex: String = match s.len() {
32 => s.to_ascii_lowercase(),
36 => {
let b = s.as_bytes();
if b[8] != b'-' || b[13] != b'-' || b[18] != b'-' || b[23] != b'-' {
return None;
}
let mut out = String::with_capacity(32);
out.push_str(&s[0..8]);
out.push_str(&s[9..13]);
out.push_str(&s[14..18]);
out.push_str(&s[19..23]);
out.push_str(&s[24..36]);
out.make_ascii_lowercase();
out
}
_ => return None,
};
let bytes = hex.as_bytes();
let mut out = [0u8; 16];
for i in 0..16 {
let hi = hex_nibble(bytes[i * 2])?;
let lo = hex_nibble(bytes[i * 2 + 1])?;
out[i] = (hi << 4) | lo;
}
Some(out)
}
fn hex_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(10 + b - b'a'),
b'A'..=b'F' => Some(10 + b - b'A'),
_ => None,
}
}
#[must_use]
pub fn format_uuid(b: &[u8; 16]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(36);
for (i, byte) in b.iter().enumerate() {
if matches!(i, 4 | 6 | 8 | 10) {
out.push('-');
}
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DomainDef {
pub name: String,
pub base_type: DataType,
pub nullable: bool,
pub default: Option<String>,
pub checks: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EnumDef {
pub name: String,
pub labels: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ViewDef {
pub name: String,
pub columns: Vec<String>,
pub body: String,
}
impl SequenceDataType {
pub fn default_bounds(self, increment_positive: bool) -> (i64, i64) {
match self {
Self::SmallInt => {
if increment_positive {
(1, i64::from(i16::MAX))
} else {
(i64::from(i16::MIN), -1)
}
}
Self::Int => {
if increment_positive {
(1, i64::from(i32::MAX))
} else {
(i64::from(i32::MIN), -1)
}
}
Self::BigInt => {
if increment_positive {
(1, i64::MAX)
} else {
(i64::MIN, -1)
}
}
}
}
}
impl Catalog {
pub const fn new() -> Self {
Self {
tables: Vec::new(),
by_name: BTreeMap::new(),
cold_segments: Vec::new(),
functions: BTreeMap::new(),
triggers: Vec::new(),
sequences: BTreeMap::new(),
views: BTreeMap::new(),
materialized_views: BTreeMap::new(),
enum_types: BTreeMap::new(),
domain_types: BTreeMap::new(),
schemas: alloc::collections::BTreeSet::new(),
}
}
pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
&self.functions
}
pub fn create_function(
&mut self,
def: FunctionDef,
or_replace: bool,
) -> Result<(), StorageError> {
if !or_replace && self.functions.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"function {:?} already exists (drop or use CREATE OR REPLACE)",
def.name
)));
}
self.functions.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_function(&mut self, name: &str) -> bool {
self.functions.remove(name).is_some()
}
pub const fn sequences(&self) -> &BTreeMap<String, SequenceDef> {
&self.sequences
}
pub fn create_sequence(
&mut self,
def: SequenceDef,
if_not_exists: bool,
) -> Result<(), StorageError> {
if self.sequences.contains_key(&def.name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"sequence {:?} already exists",
def.name
)));
}
self.sequences.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_sequence(&mut self, name: &str) -> bool {
self.sequences.remove(name).is_some()
}
pub fn sequence_next_value(&mut self, name: &str) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
let candidate = if seq.is_called {
let next = seq.last_value.checked_add(seq.increment).ok_or_else(|| {
StorageError::Corrupt(format!("sequence {name:?} arithmetic overflow"))
})?;
if seq.increment > 0 {
if next > seq.max_value {
if seq.cycle {
seq.min_value
} else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} reached MAXVALUE ({})",
seq.max_value
)));
}
} else {
next
}
} else if next < seq.min_value {
if seq.cycle {
seq.max_value
} else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} reached MINVALUE ({})",
seq.min_value
)));
}
} else {
next
}
} else {
seq.last_value
};
seq.last_value = candidate;
seq.is_called = true;
Ok(candidate)
}
pub fn sequence_current_value(&self, name: &str) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
if !seq.is_called {
return Err(StorageError::Corrupt(format!(
"currval of sequence {name:?} is not yet defined in this session"
)));
}
Ok(seq.last_value)
}
pub fn sequence_set_value(
&mut self,
name: &str,
value: i64,
is_called: bool,
) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
seq.last_value = value;
seq.is_called = is_called;
Ok(value)
}
pub const fn views(&self) -> &BTreeMap<String, ViewDef> {
&self.views
}
pub fn create_view(
&mut self,
def: ViewDef,
or_replace: bool,
if_not_exists: bool,
) -> Result<(), StorageError> {
if self.views.contains_key(&def.name) {
if or_replace {
self.views.insert(def.name.clone(), def);
return Ok(());
}
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"view {:?} already exists",
def.name
)));
}
if self.by_name.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"view {:?} would shadow an existing table",
def.name
)));
}
if self.sequences.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"view {:?} would shadow an existing sequence",
def.name
)));
}
self.views.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_view(&mut self, name: &str) -> bool {
self.views.remove(name).is_some()
}
pub const fn materialized_views(&self) -> &BTreeMap<String, String> {
&self.materialized_views
}
pub fn register_materialized_view(&mut self, name: String, body: String) {
self.materialized_views.insert(name, body);
}
pub fn drop_materialized_view_source(&mut self, name: &str) -> bool {
self.materialized_views.remove(name).is_some()
}
pub const fn enum_types(&self) -> &BTreeMap<String, EnumDef> {
&self.enum_types
}
pub fn create_enum_type(&mut self, def: EnumDef) -> Result<(), StorageError> {
if self.enum_types.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"type {:?} already exists",
def.name
)));
}
self.enum_types.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_enum_type(&mut self, name: &str) -> bool {
self.enum_types.remove(name).is_some()
}
pub const fn domain_types(&self) -> &BTreeMap<String, DomainDef> {
&self.domain_types
}
pub fn create_domain_type(&mut self, def: DomainDef) -> Result<(), StorageError> {
if self.domain_types.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"domain {:?} already exists",
def.name
)));
}
self.domain_types.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_domain_type(&mut self, name: &str) -> bool {
self.domain_types.remove(name).is_some()
}
pub const fn user_schemas(&self) -> &alloc::collections::BTreeSet<String> {
&self.schemas
}
pub fn schema_exists(&self, name: &str) -> bool {
is_builtin_schema(name) || self.schemas.contains(name)
}
pub fn create_schema(&mut self, name: String, if_not_exists: bool) -> Result<(), StorageError> {
if is_builtin_schema(&name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"schema {name:?} is built-in and cannot be redeclared"
)));
}
if self.schemas.contains(&name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"schema {name:?} already exists"
)));
}
self.schemas.insert(name);
Ok(())
}
pub fn drop_schema(&mut self, name: &str) -> Result<bool, StorageError> {
if is_builtin_schema(name) {
return Err(StorageError::Corrupt(format!(
"schema {name:?} is built-in and cannot be dropped"
)));
}
Ok(self.schemas.remove(name))
}
#[allow(clippy::too_many_arguments)]
pub fn alter_sequence(
&mut self,
name: &str,
increment: Option<i64>,
min_value: Option<i64>,
max_value: Option<i64>,
start: Option<i64>,
restart: Option<Option<i64>>,
cache: Option<i64>,
cycle: Option<bool>,
owned_by: Option<Option<(String, String)>>,
) -> Result<(), StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
if let Some(v) = increment {
seq.increment = v;
}
if let Some(v) = min_value {
seq.min_value = v;
}
if let Some(v) = max_value {
seq.max_value = v;
}
if let Some(v) = start {
seq.start = v;
}
if let Some(restart_value) = restart {
seq.last_value = restart_value.unwrap_or(seq.start);
seq.is_called = false;
}
if let Some(v) = cache {
seq.cache = v;
}
if let Some(v) = cycle {
seq.cycle = v;
}
if let Some(v) = owned_by {
seq.owned_by = v;
}
Ok(())
}
pub fn triggers(&self) -> &[TriggerDef] {
&self.triggers
}
pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
&mut self.triggers
}
pub fn create_trigger(
&mut self,
def: TriggerDef,
or_replace: bool,
) -> Result<(), StorageError> {
if !self.by_name.contains_key(&def.table) {
return Err(StorageError::TableNotFound {
name: def.table.clone(),
});
}
if !self.functions.contains_key(&def.function) {
return Err(StorageError::Corrupt(format!(
"trigger {:?} references unknown function {:?}",
def.name, def.function
)));
}
let dup = self
.triggers
.iter()
.position(|t| t.name == def.name && t.table == def.table);
match (dup, or_replace) {
(Some(_), false) => Err(StorageError::Corrupt(format!(
"trigger {:?} already exists on table {:?}",
def.name, def.table
))),
(Some(i), true) => {
self.triggers[i] = def;
Ok(())
}
(None, _) => {
self.triggers.push(def);
Ok(())
}
}
}
pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
let before = self.triggers.len();
self.triggers
.retain(|t| !(t.name == name && t.table == table));
before != self.triggers.len()
}
pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
if self.by_name.contains_key(&schema.name) {
return Err(StorageError::DuplicateTable {
name: schema.name.clone(),
});
}
let idx = self.tables.len();
let name = schema.name.clone();
self.tables.push(Table::new(schema));
self.by_name.insert(name, idx);
Ok(())
}
pub fn get(&self, name: &str) -> Option<&Table> {
let idx = *self.by_name.get(name)?;
self.tables.get(idx)
}
pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
let idx = *self.by_name.get(name)?;
self.tables.get_mut(idx)
}
pub fn table_count(&self) -> usize {
self.tables.len()
}
pub fn drop_table(&mut self, name: &str) -> bool {
let Some(idx) = self.by_name.remove(name) else {
return false;
};
self.tables.swap_remove(idx);
if idx < self.tables.len() {
let moved_name = self.tables[idx].schema.name.clone();
self.by_name.insert(moved_name, idx);
}
true
}
pub fn rename_table(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
if old == new {
return Ok(());
}
if self.by_name.contains_key(new) {
return Err(StorageError::Corrupt(format!(
"rename_table: target name {new:?} already exists"
)));
}
let idx = self
.by_name
.remove(old)
.ok_or_else(|| StorageError::TableNotFound { name: old.into() })?;
self.tables[idx].schema.name = new.to_string();
self.by_name.insert(new.to_string(), idx);
for t in &mut self.tables {
for fk in &mut t.schema.foreign_keys {
if fk.parent_table == old {
fk.parent_table = new.to_string();
}
}
}
for trig in &mut self.triggers {
if trig.table == old {
trig.table = new.to_string();
}
}
Ok(())
}
pub fn rename_index(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
if old == new {
return Ok(());
}
for t in &self.tables {
if t.indices.iter().any(|i| i.name == new) {
return Err(StorageError::Corrupt(format!(
"rename_index: target name {new:?} already exists"
)));
}
}
for t in &mut self.tables {
for i in &mut t.indices {
if i.name == old {
i.name = new.to_string();
return Ok(());
}
}
}
Err(StorageError::IndexNotFound { name: old.into() })
}
pub fn drop_named_index(&mut self, name: &str) -> bool {
for t in &mut self.tables {
let before = t.indices.len();
t.indices.retain(|i| i.name != name);
if t.indices.len() != before {
return true;
}
}
false
}
pub fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.schema.name.clone()).collect()
}
pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
})?;
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
self.cold_segments.push(Some(Arc::new(seg)));
Ok(id)
}
pub fn load_segment_bytes_at(
&mut self,
target_id: u32,
bytes: Vec<u8>,
) -> Result<(), StorageError> {
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
let idx = target_id as usize;
while self.cold_segments.len() <= idx {
self.cold_segments.push(None);
}
if self.cold_segments[idx].is_some() {
return Err(StorageError::Corrupt(format!(
"load_segment_bytes_at: segment_id {target_id} already occupied"
)));
}
self.cold_segments[idx] = Some(Arc::new(seg));
Ok(())
}
pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
let idx = segment_id as usize;
if idx >= self.cold_segments.len() {
return Err(StorageError::Corrupt(format!(
"tombstone_segment: segment_id {segment_id} out of bounds (len={})",
self.cold_segments.len()
)));
}
self.cold_segments[idx] = None;
Ok(())
}
#[must_use]
pub fn cold_segment_count(&self) -> usize {
self.cold_segments.iter().filter(|s| s.is_some()).count()
}
#[must_use]
pub fn cold_segment_slot_count(&self) -> usize {
self.cold_segments.len()
}
#[must_use]
pub fn cold_segment_ids_global(&self) -> Vec<u32> {
self.cold_segments
.iter()
.enumerate()
.filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
.collect()
}
#[must_use]
pub fn hot_tier_bytes(&self) -> u64 {
self.tables
.iter()
.map(Table::hot_bytes)
.fold(0u64, u64::saturating_add)
}
pub fn freeze_oldest_to_cold(
&mut self,
table_name: &str,
index_name: &str,
max_rows: usize,
) -> Result<FreezeReport, StorageError> {
if max_rows == 0 {
return Err(StorageError::Corrupt(
"freeze_oldest_to_cold: max_rows must be > 0".into(),
));
}
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: table {table_name:?} not found"
))
})?;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
table.rows.len()
)));
}
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
for row_idx in 0..max_rows {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
to_freeze.sort_by_key(|(k, _, _)| *k);
for w in to_freeze.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: duplicate PK {} in freeze batch",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
.into_iter()
.map(|(k, body, _)| (k, body))
.collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
#[must_use]
pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
self.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
}
pub fn resolve_cold_locator(
&self,
table_name: &str,
segment_id: u32,
key: &IndexKey,
) -> Option<Row> {
let t = self.get(table_name)?;
let u64_key = index_key_as_u64(key)?;
let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
let payload = seg.lookup(u64_key)?;
let (row, _) = decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
Some(row)
}
pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
let t = self.get(table)?;
let idx = t.indices.iter().find(|i| i.name == index_name)?;
let locators = idx.lookup_eq(key);
let cold_u64_key = index_key_as_u64(key);
for loc in locators {
match *loc {
RowLocator::Hot(i) => {
if let Some(row) = t.rows.get(i) {
return Some(row.clone());
}
}
RowLocator::Cold {
segment_id,
page_offset: _,
} => {
let Some(u64_key) = cold_u64_key else {
continue;
};
let Some(seg) = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
else {
continue;
};
let Some(payload) = seg.lookup(u64_key) else {
continue;
};
let (row, _) =
decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
return Some(row);
}
}
}
None
}
pub fn promote_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<usize>, StorageError> {
let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
let Some((segment_id, _page_offset)) = cold_loc else {
return Ok(None);
};
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(
"promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
.into(),
)
})?;
let schema = self
.get(table_name)
.ok_or_else(|| {
StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
})?
.schema
.clone();
let seg = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_ref())
.ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: segment {segment_id} not registered on catalog"
))
})?;
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: key {u64_key} resolves to segment {segment_id} \
but the segment's bloom/page lookup didn't return a row"
))
})?;
let (row, _consumed) = decode_row_body_dense(&payload, &schema, seg.codec_version())?;
let t = self
.get_mut(table_name)
.expect("table existed at lookup time");
t.insert(row)?;
let new_hot_idx =
t.rows.len().checked_sub(1).ok_or_else(|| {
StorageError::Corrupt("promote_cold_row: empty after insert".into())
})?;
t.remove_cold_locators_for_key(index_name, key)?;
Ok(Some(new_hot_idx))
}
pub fn shadow_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<usize, StorageError> {
let t = self.get_mut(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
})?;
t.remove_cold_locators_for_key(index_name, key)
}
pub fn prepare_freeze_slice(
&self,
table_name: &str,
index_name: &str,
row_range: core::ops::Range<usize>,
) -> Result<FreezeSlice, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
if row_range.end > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: row_range end {} > row_count {}",
row_range.end,
table.rows.len()
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
for row_idx in row_range.clone() {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
rows.sort_by_key(|(k, _, _)| *k);
Ok(FreezeSlice { row_range, rows })
}
pub fn commit_freeze_slices(
&mut self,
table_name: &str,
index_name: &str,
slices: Vec<FreezeSlice>,
) -> Result<FreezeReport, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let mut ordered = slices;
ordered.sort_by_key(|s| s.row_range.start);
let mut expected_start = 0usize;
for s in &ordered {
if s.row_range.start != expected_start {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: gap/overlap at row {}; expected start {}",
s.row_range.start, expected_start
)));
}
expected_start = s.row_range.end;
}
let max_rows = expected_start;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total row range {} exceeds row_count {}",
max_rows,
table.rows.len()
)));
}
if max_rows == 0 {
return Ok(FreezeReport {
segment_id: u32::MAX,
frozen_rows: 0,
bytes_freed: 0,
segment_bytes: Vec::new(),
});
}
let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
if total_rows != max_rows {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
)));
}
let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
loop {
let mut pick: Option<usize> = None;
for (i, c) in cursors.iter().enumerate() {
let slice = &ordered[i];
if *c >= slice.rows.len() {
continue;
}
match pick {
None => pick = Some(i),
Some(j) => {
if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
pick = Some(i);
}
}
}
}
let Some(i) = pick else { break };
let row = ordered[i].rows[cursors[i]].clone();
cursors[i] += 1;
merged.push(row);
}
for w in merged.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: duplicate PK {} across slices",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> =
merged.into_iter().map(|(k, body, _)| (k, body)).collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
pub fn compact_cold_segments(
&mut self,
table_name: &str,
index_name: &str,
target_segment_bytes: u64,
) -> Result<CompactReport, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: table {table_name:?} not found"
))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} not found on {table_name:?}"
))
})?;
let map = match &idx.kind {
IndexKind::BTree(m) => m,
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return Err(StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} is not BTree; \
compaction applies only to BTree cold-tier indices"
)));
}
};
let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
for (_key, locators) in map.iter() {
for loc in locators {
if let RowLocator::Cold { segment_id, .. } = loc {
referenced_ids.insert(*segment_id);
}
}
}
let candidate_set: BTreeSet<u32> = referenced_ids
.into_iter()
.filter(|id| {
self.cold_segments
.get(*id as usize)
.and_then(|s| s.as_deref())
.is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
})
.collect();
if candidate_set.len() < 2 {
return Ok(CompactReport {
sources: Vec::new(),
merged_segment_id: None,
merged_segment_bytes: Vec::new(),
merged_rows: 0,
deleted_rows_pruned: 0,
bytes_reclaimed_estimate: 0,
});
}
let mut source_row_count: usize = 0;
let mut source_byte_total: u64 = 0;
for &id in &candidate_set {
let seg = self.cold_segments[id as usize]
.as_ref()
.expect("candidate selected only when slot is Some");
source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
}
let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
for (key, locators) in map.iter() {
for loc in locators {
let RowLocator::Cold { segment_id, .. } = loc else {
continue;
};
if !candidate_set.contains(segment_id) {
continue;
}
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} has non-integer Cold key; \
cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
let seg = self.cold_segments[*segment_id as usize]
.as_ref()
.expect("candidate slot guaranteed Some above");
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: BTree {index_name:?} points key={u64_key} \
at segment {segment_id} but the segment lookup missed"
))
})?;
collected.insert(u64_key, (payload, key.clone()));
break;
}
}
let merged_rows = collected.len();
let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
let seg_rows: Vec<(u64, Vec<u8>)> = collected
.iter()
.map(|(k, (body, _))| (*k, body.clone()))
.collect();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
let merged_bytes_len = seg_bytes.len() as u64;
let merged_segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
let t = self
.get(table_name)
.expect("table existed at the start of this fn");
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map) = &idx.kind else {
unreachable!("validated above");
};
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let t_mut = self
.get_mut(table_name)
.expect("table existed at the start of this fn");
let idx_mut = t_mut
.indices
.iter_mut()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
unreachable!("validated above");
};
for (key, locators) in entries {
let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
let mut changed = false;
for loc in &locators {
match *loc {
RowLocator::Cold {
segment_id,
page_offset: _,
} if candidate_set.contains(&segment_id) => {
let replacement = RowLocator::Cold {
segment_id: merged_segment_id,
page_offset: 0,
};
if !new_locs.contains(&replacement) {
new_locs.push(replacement);
}
changed = true;
}
other => new_locs.push(other),
}
}
if changed {
map_mut.insert_mut(key, new_locs);
}
}
for &id in &candidate_set {
self.tombstone_segment(id)?;
}
let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
Ok(CompactReport {
sources: candidate_set.into_iter().collect(),
merged_segment_id: Some(merged_segment_id),
merged_segment_bytes: seg_bytes,
merged_rows,
deleted_rows_pruned,
bytes_reclaimed_estimate,
})
}
fn find_cold_locator(
&self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<(u32, u32)>, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
)));
}
for loc in idx.lookup_eq(key) {
if let RowLocator::Cold {
segment_id,
page_offset,
} = *loc
{
return Ok(Some((segment_id, page_offset)));
}
}
Ok(None)
}
}
fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
match key {
IndexKey::Int(n) => Some(n.cast_unsigned()),
IndexKey::Text(_) | IndexKey::Bool(_) | IndexKey::Uuid(_) => None,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum StorageError {
DuplicateTable {
name: String,
},
TableNotFound {
name: String,
},
ArityMismatch {
expected: usize,
actual: usize,
},
TypeMismatch {
column: String,
expected: DataType,
actual: DataType,
position: usize,
},
NullInNotNull {
column: String,
},
DuplicateIndex {
name: String,
},
ColumnNotFound {
column: String,
},
Corrupt(String),
IndexNotFound {
name: String,
},
Unsupported(String),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
Self::TableNotFound { name } => write!(f, "table not found: {name}"),
Self::ArityMismatch { expected, actual } => write!(
f,
"row arity mismatch: expected {expected} columns, got {actual}"
),
Self::TypeMismatch {
column,
expected,
actual,
position,
} => write!(
f,
"type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
),
Self::NullInNotNull { column } => {
write!(f, "NULL value in NOT NULL column {column:?}")
}
Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
}
}
}
impl ColumnSchema {
pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
Self {
name: name.into(),
ty,
nullable,
default: None,
runtime_default: None,
auto_increment: false,
user_enum_type: None,
user_domain_type: None,
on_update_runtime: None,
collation: Collation::Binary,
is_unsigned: false,
inline_enum_variants: None,
inline_set_variants: None,
}
}
#[must_use]
pub fn with_default(mut self, default: Value) -> Self {
self.default = Some(default);
self
}
#[must_use]
pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
self.runtime_default = Some(expr.into());
self
}
#[must_use]
pub const fn with_auto_increment(mut self) -> Self {
self.auto_increment = true;
self
}
}
impl TableSchema {
pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
Self {
name: name.into(),
columns,
hot_tier_bytes: None,
foreign_keys: Vec::new(),
uniqueness_constraints: Vec::new(),
checks: Vec::new(),
}
}
}
const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
const FILE_VERSION: u8 = 47;
const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
const INDEX_KEY_TAG_INT: u8 = 0;
const INDEX_KEY_TAG_TEXT: u8 = 1;
const INDEX_KEY_TAG_BOOL: u8 = 2;
const INDEX_KEY_TAG_UUID: u8 = 3;
impl Catalog {
pub fn serialize(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(FILE_VERSION);
write_u32(
&mut out,
u32::try_from(self.tables.len()).expect("≤ 4G tables"),
);
for t in &self.tables {
write_str(&mut out, &t.schema.name);
write_u16(
&mut out,
u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
);
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(
&mut out,
u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
);
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(
&mut out,
u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
);
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(
&mut out,
u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
);
match &idx.kind {
IndexKind::BTree(map) => {
out.push(0);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G index entries/index"),
);
for (key, locators) in map {
write_index_key(&mut out, key);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/key"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { column_type } => {
out.push(2);
write_data_type(&mut out, *column_type);
}
IndexKind::Gin(map) => {
out.push(3);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
);
for (word, locators) in map {
write_str(&mut out, word);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::GinTrgm(map) => {
out.push(4);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G trigram-GIN posting lists"),
);
for (tri, locators) in map {
write_str(&mut out, tri);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::GinFulltext(map) => {
out.push(5);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G fulltext-GIN posting lists"),
);
for (lex, locators) in map {
write_str(&mut out, lex);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
}
write_u16(
&mut out,
u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
);
for col_pos in &idx.included_columns {
write_u16(
&mut out,
u16::try_from(*col_pos).expect("≤ 65k columns/table"),
);
}
match &idx.partial_predicate {
None => out.push(0),
Some(pred) => {
out.push(1);
write_str(&mut out, pred);
}
}
match &idx.expression {
None => out.push(0),
Some(expr) => {
out.push(1);
write_str(&mut out, expr);
}
}
out.push(u8::from(idx.is_unique));
write_u16(
&mut out,
u16::try_from(idx.extra_column_positions.len())
.expect("≤ 65k extra cols / index"),
);
for cp in &idx.extra_column_positions {
write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
}
}
match t.schema.hot_tier_bytes {
None => out.push(0),
Some(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
}
write_u16(
&mut out,
u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
);
for fk in &t.schema.foreign_keys {
match &fk.name {
None => out.push(0),
Some(n) => {
out.push(1);
write_str(&mut out, n);
}
}
write_u16(
&mut out,
u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
);
for &p in &fk.local_columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
write_str(&mut out, &fk.parent_table);
write_u16(
&mut out,
u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
);
for &p in &fk.parent_columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
out.push(fk.on_delete.tag());
out.push(fk.on_update.tag());
}
write_u16(
&mut out,
u16::try_from(t.schema.uniqueness_constraints.len())
.expect("≤ 65k uniqueness constraints/table"),
);
for uc in &t.schema.uniqueness_constraints {
out.push(u8::from(uc.is_primary_key));
write_u16(
&mut out,
u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
);
for &p in &uc.columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
out.push(u8::from(uc.nulls_not_distinct));
}
let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.runtime_default {
rt_defaults.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
);
for (pos, expr) in rt_defaults {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, expr);
}
write_u16(
&mut out,
u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
);
for c in &t.schema.checks {
write_str(&mut out, c.as_str());
}
let mut enum_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.user_enum_type {
enum_bindings.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(enum_bindings.len()).expect("≤ 65k enum-typed columns/table"),
);
for (pos, ename) in enum_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, ename);
}
let mut domain_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(d) = &c.user_domain_type {
domain_bindings.push((i, d.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(domain_bindings.len()).expect("≤ 65k domain-typed columns/table"),
);
for (pos, dname) in domain_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, dname);
}
let mut on_update_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.on_update_runtime {
on_update_bindings.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(on_update_bindings.len()).expect("≤ 65k ON UPDATE columns/table"),
);
for (pos, expr_src) in on_update_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, expr_src);
}
let mut coll_bindings: Vec<(usize, u8)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
let tag = match c.collation {
Collation::Binary => continue,
Collation::CaseInsensitive => Collation::TAG_CASE_INSENSITIVE,
};
coll_bindings.push((i, tag));
}
write_u16(
&mut out,
u16::try_from(coll_bindings.len()).expect("≤ 65k collation bindings/table"),
);
for (pos, tag) in coll_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
out.push(tag);
}
let mut unsigned_bindings: Vec<usize> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if c.is_unsigned {
unsigned_bindings.push(i);
}
}
write_u16(
&mut out,
u16::try_from(unsigned_bindings.len()).expect("≤ 65k UNSIGNED columns/table"),
);
for pos in unsigned_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
}
let mut enum_inline_bindings: Vec<(usize, &[String])> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(vs) = &c.inline_enum_variants {
enum_inline_bindings.push((i, vs.as_slice()));
}
}
write_u16(
&mut out,
u16::try_from(enum_inline_bindings.len()).expect("≤ 65k inline-ENUM columns/table"),
);
for (pos, variants) in enum_inline_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_u16(
&mut out,
u16::try_from(variants.len()).expect("≤ 65k variants/ENUM"),
);
for v in variants {
write_str(&mut out, v.as_str());
}
}
let mut set_inline_bindings: Vec<(usize, &[String])> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(vs) = &c.inline_set_variants {
set_inline_bindings.push((i, vs.as_slice()));
}
}
write_u16(
&mut out,
u16::try_from(set_inline_bindings.len()).expect("≤ 65k inline-SET columns/table"),
);
for (pos, variants) in set_inline_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_u16(
&mut out,
u16::try_from(variants.len()).expect("≤ 65k variants/SET"),
);
for v in variants {
write_str(&mut out, v.as_str());
}
}
}
write_u32(
&mut out,
u32::try_from(self.functions.len()).expect("≤ 4G functions"),
);
for fd in self.functions.values() {
write_str(&mut out, &fd.name);
write_str(&mut out, &fd.args_repr);
write_str(&mut out, &fd.returns);
write_str(&mut out, &fd.language);
write_str_long(&mut out, &fd.body);
}
write_u32(
&mut out,
u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
);
for td in &self.triggers {
write_str(&mut out, &td.name);
write_str(&mut out, &td.table);
write_str(&mut out, &td.timing);
write_u16(
&mut out,
u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
);
for ev in &td.events {
write_str(&mut out, ev);
}
write_str(&mut out, &td.for_each);
write_str(&mut out, &td.function);
write_u16(
&mut out,
u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
);
for c in &td.update_columns {
write_str(&mut out, c);
}
out.push(u8::from(td.enabled));
}
write_u32(
&mut out,
u32::try_from(self.sequences.len()).expect("≤ 4G sequences"),
);
for seq in self.sequences.values() {
write_str(&mut out, &seq.name);
out.push(match seq.data_type {
SequenceDataType::SmallInt => 0,
SequenceDataType::Int => 1,
SequenceDataType::BigInt => 2,
});
out.extend_from_slice(&seq.start.to_le_bytes());
out.extend_from_slice(&seq.increment.to_le_bytes());
out.extend_from_slice(&seq.min_value.to_le_bytes());
out.extend_from_slice(&seq.max_value.to_le_bytes());
out.extend_from_slice(&seq.cache.to_le_bytes());
out.push(u8::from(seq.cycle));
match &seq.owned_by {
None => out.push(0),
Some((table, column)) => {
out.push(1);
write_str(&mut out, table);
write_str(&mut out, column);
}
}
out.extend_from_slice(&seq.last_value.to_le_bytes());
out.push(u8::from(seq.is_called));
}
write_u32(
&mut out,
u32::try_from(self.views.len()).expect("≤ 4G views"),
);
for view in self.views.values() {
write_str(&mut out, &view.name);
write_u16(
&mut out,
u16::try_from(view.columns.len()).expect("≤ 65k cols / view"),
);
for c in &view.columns {
write_str(&mut out, c);
}
write_str_long(&mut out, &view.body);
}
write_u32(
&mut out,
u32::try_from(self.materialized_views.len()).expect("≤ 4G materialized views"),
);
for (name, body) in &self.materialized_views {
write_str(&mut out, name);
write_str_long(&mut out, body);
}
write_u32(
&mut out,
u32::try_from(self.enum_types.len()).expect("≤ 4G enum types"),
);
for e in self.enum_types.values() {
write_str(&mut out, &e.name);
write_u16(
&mut out,
u16::try_from(e.labels.len()).expect("≤ 65k labels / enum"),
);
for l in &e.labels {
write_str(&mut out, l);
}
}
write_u32(
&mut out,
u32::try_from(self.domain_types.len()).expect("≤ 4G domain types"),
);
for d in self.domain_types.values() {
write_str(&mut out, &d.name);
write_data_type(&mut out, d.base_type);
out.push(u8::from(d.nullable));
match &d.default {
None => out.push(0),
Some(s) => {
out.push(1);
write_str(&mut out, s);
}
}
write_u16(
&mut out,
u16::try_from(d.checks.len()).expect("≤ 65k CHECKs / domain"),
);
for c in &d.checks {
write_str(&mut out, c);
}
}
write_u32(
&mut out,
u32::try_from(self.schemas.len()).expect("≤ 4G schemas"),
);
for name in &self.schemas {
write_str(&mut out, name);
}
out
}
pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
let mut cur = Cursor::new(buf);
let magic = cur.take(8)?;
if magic != FILE_MAGIC {
return Err(StorageError::Corrupt(format!(
"bad magic: expected SPGDB001, got {magic:?}"
)));
}
let version = cur.read_u8()?;
if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
return Err(StorageError::Corrupt(format!(
"unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
)));
}
cur.codec_version = version;
let table_count = cur.read_u32()? as usize;
let mut cat = Self::new();
for _ in 0..table_count {
deserialize_table(&mut cur, &mut cat, version)?;
}
if version >= 22 {
let fn_count = cur.read_u32()? as usize;
for _ in 0..fn_count {
let name = cur.read_str()?;
let args_repr = cur.read_str()?;
let returns = cur.read_str()?;
let language = cur.read_str()?;
let body = cur.read_str_long()?;
cat.functions.insert(
name.clone(),
FunctionDef {
name,
args_repr,
returns,
language,
body,
},
);
}
let trg_count = cur.read_u32()? as usize;
for _ in 0..trg_count {
let name = cur.read_str()?;
let table = cur.read_str()?;
let timing = cur.read_str()?;
let ev_count = cur.read_u16()? as usize;
let mut events = Vec::with_capacity(ev_count);
for _ in 0..ev_count {
events.push(cur.read_str()?);
}
let for_each = cur.read_str()?;
let function = cur.read_str()?;
let update_columns = if version >= 23 {
let n = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(n);
for _ in 0..n {
cols.push(cur.read_str()?);
}
cols
} else {
Vec::new()
};
let enabled = if version >= 25 {
cur.read_u8()? != 0
} else {
true
};
cat.triggers.push(TriggerDef {
name,
table,
timing,
events,
for_each,
function,
update_columns,
enabled,
});
}
}
if version >= 26 {
let seq_count = cur.read_u32()? as usize;
for _ in 0..seq_count {
let name = cur.read_str()?;
let data_type = match cur.read_u8()? {
0 => SequenceDataType::SmallInt,
1 => SequenceDataType::Int,
2 => SequenceDataType::BigInt,
other => {
return Err(StorageError::Corrupt(format!(
"unknown SEQUENCE data-type tag {other}"
)));
}
};
let start = cur.read_i64()?;
let increment = cur.read_i64()?;
let min_value = cur.read_i64()?;
let max_value = cur.read_i64()?;
let cache = cur.read_i64()?;
let cycle = cur.read_u8()? != 0;
let owned_by = match cur.read_u8()? {
0 => None,
1 => {
let t = cur.read_str()?;
let c = cur.read_str()?;
Some((t, c))
}
other => {
return Err(StorageError::Corrupt(format!(
"unknown SEQUENCE owned-by tag {other}"
)));
}
};
let last_value = cur.read_i64()?;
let is_called = cur.read_u8()? != 0;
cat.sequences.insert(
name.clone(),
SequenceDef {
name,
data_type,
start,
increment,
min_value,
max_value,
cache,
cycle,
owned_by,
last_value,
is_called,
},
);
}
}
if version >= 27 {
let view_count = cur.read_u32()? as usize;
for _ in 0..view_count {
let name = cur.read_str()?;
let col_count = cur.read_u16()? as usize;
let mut columns = Vec::with_capacity(col_count);
for _ in 0..col_count {
columns.push(cur.read_str()?);
}
let body = cur.read_str_long()?;
cat.views.insert(
name.clone(),
ViewDef {
name,
columns,
body,
},
);
}
}
if version >= 28 {
let mv_count = cur.read_u32()? as usize;
for _ in 0..mv_count {
let name = cur.read_str()?;
let body = cur.read_str_long()?;
cat.materialized_views.insert(name, body);
}
}
if version >= 29 {
let etype_count = cur.read_u32()? as usize;
for _ in 0..etype_count {
let name = cur.read_str()?;
let label_count = cur.read_u16()? as usize;
let mut labels = Vec::with_capacity(label_count);
for _ in 0..label_count {
labels.push(cur.read_str()?);
}
cat.enum_types
.insert(name.clone(), EnumDef { name, labels });
}
}
if version >= 30 {
let dtype_count = cur.read_u32()? as usize;
for _ in 0..dtype_count {
let name = cur.read_str()?;
let base_type = cur.read_data_type()?;
let nullable = cur.read_u8()? != 0;
let default = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_str()?),
other => {
return Err(StorageError::Corrupt(format!(
"unknown DOMAIN default tag {other}"
)));
}
};
let check_count = cur.read_u16()? as usize;
let mut checks = Vec::with_capacity(check_count);
for _ in 0..check_count {
checks.push(cur.read_str()?);
}
cat.domain_types.insert(
name.clone(),
DomainDef {
name,
base_type,
nullable,
default,
checks,
},
);
}
}
if version >= 31 {
let sch_count = cur.read_u32()? as usize;
for _ in 0..sch_count {
let name = cur.read_str()?;
cat.schemas.insert(name);
}
}
if cur.pos < buf.len() {
return Err(StorageError::Corrupt(format!(
"trailing bytes: {} unread",
buf.len() - cur.pos
)));
}
Ok(cat)
}
}
fn deserialize_table(
cur: &mut Cursor<'_>,
cat: &mut Catalog,
version: u8,
) -> Result<(), StorageError> {
let table_name = cur.read_str()?;
let name = table_name.clone();
let col_count = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(col_count);
for _ in 0..col_count {
let c_name = cur.read_str()?;
let ty = cur.read_data_type()?;
let nullable = cur.read_u8()? != 0;
let default = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_value()?),
other => {
return Err(StorageError::Corrupt(format!(
"unknown default tag: {other}"
)));
}
};
let auto_increment = cur.read_u8()? != 0;
cols.push(ColumnSchema {
name: c_name,
ty,
nullable,
default,
runtime_default: None,
auto_increment,
user_enum_type: None,
user_domain_type: None,
on_update_runtime: None,
collation: Collation::Binary,
is_unsigned: false,
inline_enum_variants: None,
inline_set_variants: None,
});
}
let n_cols = cols.len();
cat.create_table(TableSchema::new(name, cols))?;
let t = cat.tables.last_mut().expect("create_table just pushed");
deserialize_rows(cur, t, n_cols)?;
deserialize_indices(cur, t, version)?;
if version >= 11 {
let has = cur.read_u8()?;
let hot_tier_bytes = match has {
0 => None,
1 => Some(cur.read_u64()?),
other => {
return Err(StorageError::Corrupt(format!(
"hot_tier_bytes appendix: unknown has-value byte {other}"
)));
}
};
t.schema_mut().hot_tier_bytes = hot_tier_bytes;
}
if version >= 13 {
let fk_count = cur.read_u16()? as usize;
let mut fks = Vec::with_capacity(fk_count);
for _ in 0..fk_count {
let name = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_str()?),
other => {
return Err(StorageError::Corrupt(format!(
"FK appendix: unknown has-name byte {other}"
)));
}
};
let local_arity = cur.read_u16()? as usize;
let mut local_columns = Vec::with_capacity(local_arity);
for _ in 0..local_arity {
local_columns.push(cur.read_u16()? as usize);
}
let parent_table = cur.read_str()?;
let parent_arity = cur.read_u16()? as usize;
if parent_arity != local_arity {
return Err(StorageError::Corrupt(format!(
"FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
)));
}
let mut parent_columns = Vec::with_capacity(parent_arity);
for _ in 0..parent_arity {
parent_columns.push(cur.read_u16()? as usize);
}
let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
})?;
let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
StorageError::Corrupt("FK appendix: unknown on_update tag".into())
})?;
fks.push(ForeignKeyConstraint {
name,
local_columns,
parent_table,
parent_columns,
on_delete,
on_update,
});
}
t.schema_mut().foreign_keys = fks;
}
if version >= 15 {
let uc_count = cur.read_u16()? as usize;
let mut ucs = Vec::with_capacity(uc_count);
for _ in 0..uc_count {
let is_pk = cur.read_u8()? != 0;
let arity = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(arity);
for _ in 0..arity {
cols.push(cur.read_u16()? as usize);
}
let nulls_not_distinct = if version >= 23 {
cur.read_u8()? != 0
} else {
false
};
ucs.push(UniquenessConstraint {
is_primary_key: is_pk,
columns: cols,
nulls_not_distinct,
});
}
t.schema_mut().uniqueness_constraints = ucs;
let rt_count = cur.read_u16()? as usize;
for _ in 0..rt_count {
let pos = cur.read_u16()? as usize;
let expr = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(pos) {
col.runtime_default = Some(expr);
}
}
}
if version >= 23 {
let check_count = cur.read_u16()? as usize;
let mut checks = Vec::with_capacity(check_count);
for _ in 0..check_count {
checks.push(cur.read_str()?);
}
t.schema_mut().checks = checks;
}
if version >= 29 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let ename = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.user_enum_type = Some(ename);
}
}
}
if version >= 30 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let dname = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.user_domain_type = Some(dname);
}
}
}
if version >= 32 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let expr_src = cur.read_str()?;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.on_update_runtime = Some(expr_src);
}
}
}
if version >= 34 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let tag = cur.read_u8()?;
let collation = match tag {
Collation::TAG_CASE_INSENSITIVE => Collation::CaseInsensitive,
_ => Collation::Binary,
};
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.collation = collation;
}
}
}
if version >= 35 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.is_unsigned = true;
}
}
}
if version >= 41 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let variant_count = cur.read_u16()? as usize;
let mut variants = Vec::with_capacity(variant_count);
for _ in 0..variant_count {
variants.push(cur.read_str()?);
}
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.inline_enum_variants = Some(variants);
}
}
}
if version >= 42 {
let binding_count = cur.read_u16()? as usize;
for _ in 0..binding_count {
let col_pos = cur.read_u16()? as usize;
let variant_count = cur.read_u16()? as usize;
let mut variants = Vec::with_capacity(variant_count);
for _ in 0..variant_count {
variants.push(cur.read_str()?);
}
if let Some(col) = t.schema_mut().columns.get_mut(col_pos) {
col.inline_set_variants = Some(variants);
}
}
}
let _ = table_name;
Ok(())
}
fn deserialize_rows(
cur: &mut Cursor<'_>,
t: &mut Table,
_n_cols: usize,
) -> Result<(), StorageError> {
let row_count = cur.read_u32()? as usize;
let mut hot_bytes: u64 = 0;
for _ in 0..row_count {
let tail = &cur.buf[cur.pos..];
let (row, consumed) = decode_row_body_dense(tail, &t.schema, cur.codec_version)?;
cur.pos += consumed;
hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
t.rows.push_mut(row);
}
t.hot_bytes = hot_bytes;
Ok(())
}
fn deserialize_indices(
cur: &mut Cursor<'_>,
t: &mut Table,
version: u8,
) -> Result<(), StorageError> {
let index_count = cur.read_u16()? as usize;
for _ in 0..index_count {
let idx_name = cur.read_str()?;
let col_pos = cur.read_u16()? as usize;
let column_name = t
.schema
.columns
.get(col_pos)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"index {idx_name:?} points at non-existent column position {col_pos}"
))
})?
.name
.clone();
let kind_tag = cur.read_u8()?;
match kind_tag {
0 => {
if version >= 9 {
let map = read_btree_map(cur)?;
t.restore_btree_index(idx_name, &column_name, map)?;
} else {
t.add_index(idx_name, &column_name)?;
}
}
1 => {
let m = cur.read_u16()? as usize;
let graph = cur.read_nsw_graph(m)?;
t.restore_nsw_index(idx_name, &column_name, graph)?;
}
2 => {
let column_type = cur.read_data_type()?;
t.restore_brin_index(idx_name, &column_name, column_type)?;
}
3 => {
let map = read_gin_map(cur)?;
t.restore_gin_index(idx_name, &column_name, map)?;
}
4 => {
if version < 24 {
return Err(StorageError::Corrupt(format!(
"trigram-GIN index tag 4 found in catalog FILE_VERSION {version}; \
FILE_VERSION 24+ required (v7.15.0 introduced this tag)"
)));
}
let map = read_gin_map(cur)?;
t.restore_gin_trgm_index(idx_name, &column_name, map)?;
}
5 => {
if version < 33 {
return Err(StorageError::Corrupt(format!(
"fulltext-GIN index tag 5 found in catalog FILE_VERSION {version}; \
FILE_VERSION 33+ required (v7.17.0 Phase 2.2 introduced this tag)"
)));
}
let map = read_gin_map(cur)?;
t.restore_gin_fulltext_index(idx_name, &column_name, map)?;
}
other => {
return Err(StorageError::Corrupt(format!(
"unknown index kind tag: {other}"
)));
}
}
if version >= 12 {
let num_included = cur.read_u16()? as usize;
if num_included > 0 {
let mut included: Vec<usize> = Vec::with_capacity(num_included);
for _ in 0..num_included {
let cp = cur.read_u16()? as usize;
if cp >= t.schema.columns.len() {
return Err(StorageError::Corrupt(format!(
"INCLUDE column position {cp} out of range \
({} schema columns)",
t.schema.columns.len()
)));
}
included.push(cp);
}
if let Some(last) = t.indices.last_mut() {
last.included_columns = included;
}
}
match cur.read_u8()? {
0 => {}
1 => {
let pred = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.partial_predicate = Some(pred);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"partial_predicate tag: unknown byte {other}"
)));
}
}
match cur.read_u8()? {
0 => {}
1 => {
let expr = cur.read_str()?;
if let Some(last) = t.indices.last_mut() {
last.expression = Some(expr);
}
}
other => {
return Err(StorageError::Corrupt(format!(
"expression tag: unknown byte {other}"
)));
}
}
if version >= 16 {
match cur.read_u8()? {
0 => {}
1 => {
if let Some(last) = t.indices.last_mut() {
last.is_unique = true;
}
}
other => {
return Err(StorageError::Corrupt(format!(
"is_unique tag: unknown byte {other}"
)));
}
}
let n = cur.read_u16()? as usize;
if n > 0 {
let mut extras: Vec<usize> = Vec::with_capacity(n);
for _ in 0..n {
let cp = cur.read_u16()? as usize;
if cp >= t.schema.columns.len() {
return Err(StorageError::Corrupt(format!(
"extra column position {cp} out of range \
({} schema columns)",
t.schema.columns.len()
)));
}
extras.push(cp);
}
if let Some(last) = t.indices.last_mut() {
last.extra_column_positions = extras;
}
}
}
}
}
Ok(())
}
fn read_btree_map(
cur: &mut Cursor<'_>,
) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
let entry_count = cur.read_u32()? as usize;
let mut map = PersistentBTreeMap::new();
for _ in 0..entry_count {
let key = cur.read_index_key()?;
let locator_count = cur.read_u32()? as usize;
let mut locators = Vec::with_capacity(locator_count);
for _ in 0..locator_count {
let tail = &cur.buf[cur.pos..];
let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
})?;
cur.pos += consumed;
locators.push(loc);
}
map.insert_mut(key, locators);
}
Ok(map)
}
fn read_gin_map(
cur: &mut Cursor<'_>,
) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
let entry_count = cur.read_u32()? as usize;
let mut map = PersistentBTreeMap::new();
for _ in 0..entry_count {
let word = cur.read_str()?;
let locator_count = cur.read_u32()? as usize;
let mut locators = Vec::with_capacity(locator_count);
for _ in 0..locator_count {
let tail = &cur.buf[cur.pos..];
let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
})?;
cur.pos += consumed;
locators.push(loc);
}
map.insert_mut(word, locators);
}
Ok(map)
}
fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
let entry = g.entry.map_or(u32::MAX, |e| {
u32::try_from(e).expect("NSW entry fits in u32")
});
write_u16(
out,
u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
);
out.extend_from_slice(&entry.to_le_bytes());
out.push(g.entry_level);
let node_count = g.levels.len();
write_u32(
out,
u32::try_from(node_count).expect("HNSW node count fits in u32"),
);
for &lvl in &g.levels {
out.push(lvl);
}
let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
out.push(layer_count);
for layer in &g.layers {
write_u32(
out,
u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
);
for neighbors in layer {
write_u16(
out,
u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
);
for &peer in neighbors {
write_u32(out, peer);
}
}
}
}
fn write_data_type(out: &mut Vec<u8>, t: DataType) {
match t {
DataType::Int => out.push(1),
DataType::BigInt => out.push(2),
DataType::Float => out.push(3),
DataType::Text => out.push(4),
DataType::Bool => out.push(5),
DataType::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => {
out.push(6);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::F16 => {
out.push(15);
out.extend_from_slice(&dim.to_le_bytes());
}
VecEncoding::Sq8 => {
out.push(14);
out.extend_from_slice(&dim.to_le_bytes());
}
},
DataType::SmallInt => out.push(7),
DataType::Varchar(max) => {
out.push(8);
out.extend_from_slice(&max.to_le_bytes());
}
DataType::Char(size) => {
out.push(9);
out.extend_from_slice(&size.to_le_bytes());
}
DataType::Numeric { precision, scale } => {
out.push(10);
out.push(precision);
out.push(scale);
}
DataType::Date => out.push(11),
DataType::Timestamp => out.push(12),
DataType::Timestamptz => out.push(17),
DataType::Interval => {
unreachable!("DataType::Interval has no on-disk encoding in v2.11")
}
DataType::Json => out.push(13),
DataType::Jsonb => out.push(16),
DataType::Bytes => out.push(18),
DataType::TextArray => out.push(19),
DataType::IntArray => out.push(20),
DataType::BigIntArray => out.push(21),
DataType::TsVector => out.push(22),
DataType::TsQuery => out.push(23),
DataType::Uuid => out.push(24),
DataType::Time => out.push(25),
DataType::Year => out.push(26),
DataType::TimeTz => out.push(27),
DataType::Money => out.push(28),
DataType::Range(k) => {
out.push(29);
out.push(k.tag());
}
DataType::Hstore => out.push(30),
DataType::IntArray2D => out.push(31),
DataType::BigIntArray2D => out.push(32),
DataType::TextArray2D => out.push(33),
}
}
impl Cursor<'_> {
fn read_data_type(&mut self) -> Result<DataType, StorageError> {
let tag = self.read_u8()?;
match tag {
1 => Ok(DataType::Int),
2 => Ok(DataType::BigInt),
3 => Ok(DataType::Float),
4 => Ok(DataType::Text),
5 => Ok(DataType::Bool),
6 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F32,
}),
7 => Ok(DataType::SmallInt),
8 => Ok(DataType::Varchar(self.read_u32()?)),
9 => Ok(DataType::Char(self.read_u32()?)),
10 => {
let precision = self.read_u8()?;
let scale = self.read_u8()?;
Ok(DataType::Numeric { precision, scale })
}
11 => Ok(DataType::Date),
12 => Ok(DataType::Timestamp),
13 => Ok(DataType::Json),
14 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::Sq8,
}),
15 => Ok(DataType::Vector {
dim: self.read_u32()?,
encoding: VecEncoding::F16,
}),
16 => Ok(DataType::Jsonb),
17 => Ok(DataType::Timestamptz),
18 => Ok(DataType::Bytes),
19 => Ok(DataType::TextArray),
20 => Ok(DataType::IntArray),
21 => Ok(DataType::BigIntArray),
22 => Ok(DataType::TsVector),
23 => Ok(DataType::TsQuery),
24 => Ok(DataType::Uuid),
25 => Ok(DataType::Time),
26 => Ok(DataType::Year),
27 => Ok(DataType::TimeTz),
28 => Ok(DataType::Money),
29 => {
let kt = self.read_u8()?;
let k = RangeKind::from_tag(kt)
.ok_or_else(|| StorageError::Corrupt(format!("unknown RangeKind tag: {kt}")))?;
Ok(DataType::Range(k))
}
30 => Ok(DataType::Hstore),
31 => Ok(DataType::IntArray2D),
32 => Ok(DataType::BigIntArray2D),
33 => Ok(DataType::TextArray2D),
other => Err(StorageError::Corrupt(format!(
"unknown data type tag: {other}"
))),
}
}
}
pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"row_body_encoded_len: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut n = bitmap_bytes;
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
n += value_body_encoded_len(v, schema.columns[col_idx].ty);
}
n
}
fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
match v {
Value::SmallInt(_) => 2,
Value::Int(_) | Value::Date(_) => 4,
Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
Value::Bool(_) => 1,
Value::Text(s) | Value::Json(s) => {
if s.len() >= STR_LEN_ESCAPE as usize {
6 + s.len()
} else {
2 + s.len()
}
}
Value::Vector(vec) => 4 + 4 * vec.len(),
Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
Value::HalfVector(h) => 4 + h.bytes.len(),
Value::Numeric { .. } => 16 + 1,
Value::Bytes(b) => 2 + b.len(),
Value::TextArray(items) => {
let mut n = 2; for item in items {
n += 1; if let Some(s) = item {
n += 2 + s.len();
}
}
n
}
Value::IntArray(items) => {
2 + items
.iter()
.map(|x| if x.is_some() { 5 } else { 1 })
.sum::<usize>()
}
Value::BigIntArray(items) => {
2 + items
.iter()
.map(|x| if x.is_some() { 9 } else { 1 })
.sum::<usize>()
}
Value::TsVector(lexs) => {
let mut n = 2;
for l in lexs {
n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
}
n
}
Value::TsQuery(ast) => tsquery_encoded_len(ast),
Value::Uuid(_) => 16,
Value::Time(_) => 8,
Value::Year(_) => 2,
Value::TimeTz { .. } => 12,
Value::Money(_) => 8,
Value::Range { lower, upper, .. } => {
1 + lower
.as_ref()
.map(|v| write_value_encoded_len(v))
.unwrap_or(0)
+ upper
.as_ref()
.map(|v| write_value_encoded_len(v))
.unwrap_or(0)
}
Value::Hstore(pairs) => {
let mut n = 4;
for (k, v) in pairs {
n += 4 + k.len() + 1;
if let Some(val) = v {
n += 4 + val.len();
}
}
n
}
Value::IntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
8 + rows.len() * cols * (1 + 4)
}
Value::BigIntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
8 + rows.len() * cols * (1 + 8)
}
Value::TextArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
let mut n = 8 + rows.len() * cols;
for row in rows {
for s in row.iter().flatten() {
n += 4 + s.len();
}
}
n
}
Value::Null => 0,
Value::Interval { .. } => {
unreachable!("Value::Interval has no on-disk encoding")
}
}
}
pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
debug_assert_eq!(
row.values.len(),
schema.columns.len(),
"dense encode: row arity must match schema"
);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
let bitmap_offset = out.len();
out.resize(bitmap_offset + bitmap_bytes, 0);
for (i, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
out[bitmap_offset + i / 8] |= 1 << (i % 8);
}
}
for (col_idx, v) in row.values.iter().enumerate() {
if matches!(v, Value::Null) {
continue;
}
write_value_body(&mut out, v, schema.columns[col_idx].ty);
}
out
}
pub fn decode_row_body_dense(
bytes: &[u8],
schema: &TableSchema,
codec_version: u8,
) -> Result<(Row, usize), StorageError> {
let mut cur = Cursor::new(bytes).with_codec_version(codec_version);
let bitmap_bytes = schema.columns.len().div_ceil(8);
let mut bitmap_buf = [0u8; 32];
if bitmap_bytes > bitmap_buf.len() {
return Err(StorageError::Corrupt(format!(
"row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
)));
}
let slice = cur.take(bitmap_bytes)?;
bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
let mut values = Vec::with_capacity(schema.columns.len());
for (col_idx, col) in schema.columns.iter().enumerate() {
if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
values.push(Value::Null);
} else {
values.push(cur.read_value_body(col.ty)?);
}
}
Ok((Row { values }, cur.pos))
}
fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
match (v, ty) {
(Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
(Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
(Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
(Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
(Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
write_str(out, s);
}
(
Value::Vector(v),
DataType::Vector {
encoding: VecEncoding::F32,
..
},
) => {
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
(
Value::Sq8Vector(q),
DataType::Vector {
encoding: VecEncoding::Sq8,
..
},
) => {
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
(
Value::HalfVector(h),
DataType::Vector {
encoding: VecEncoding::F16,
..
},
) => {
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
(Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
out.extend_from_slice(&scaled.to_le_bytes());
out.push(scale);
}
(Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
(Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
out.extend_from_slice(&t.to_le_bytes())
}
(Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
(Value::Bytes(b), DataType::Bytes) => write_bytes_escaped(out, b),
(Value::TextArray(items), DataType::TextArray) => {
let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(s) => {
out.push(0);
write_bytes_escaped(out, s.as_bytes());
}
}
}
}
(Value::IntArray(items), DataType::IntArray) => {
let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
(Value::BigIntArray(items), DataType::BigIntArray) => {
let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
(Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
(Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
(Value::Uuid(b), DataType::Uuid) => out.extend_from_slice(&b[..]),
(Value::Time(us), DataType::Time) => out.extend_from_slice(&us.to_le_bytes()),
(Value::Year(y), DataType::Year) => out.extend_from_slice(&y.to_le_bytes()),
(Value::TimeTz { us, offset_secs }, DataType::TimeTz) => {
out.extend_from_slice(&us.to_le_bytes());
out.extend_from_slice(&offset_secs.to_le_bytes());
}
(Value::Money(c), DataType::Money) => out.extend_from_slice(&c.to_le_bytes()),
(
Value::Range {
lower,
upper,
lower_inc,
upper_inc,
empty,
..
},
DataType::Range(_),
) => {
let mut flags: u8 = 0;
if *empty {
flags |= 0b0000_0001;
}
if lower.is_some() {
flags |= 0b0000_0010;
}
if upper.is_some() {
flags |= 0b0000_0100;
}
if *lower_inc {
flags |= 0b0000_1000;
}
if *upper_inc {
flags |= 0b0001_0000;
}
out.push(flags);
if let Some(l) = lower {
write_value(out, l);
}
if let Some(u) = upper {
write_value(out, u);
}
}
(Value::Hstore(pairs), DataType::Hstore) => write_hstore_body(out, pairs),
(Value::IntArray2D(rows), DataType::IntArray2D) => write_int_2d_body(out, rows),
(Value::BigIntArray2D(rows), DataType::BigIntArray2D) => write_bigint_2d_body(out, rows),
(Value::TextArray2D(rows), DataType::TextArray2D) => write_text_2d_body(out, rows),
(other, ty) => unreachable!(
"schema-driven encode received mismatched value/type pair: \
value tag={:?}, column type={:?}",
other.data_type(),
ty
),
}
}
fn write_value_encoded_len(v: &Value) -> usize {
match v {
Value::Null => 1,
Value::SmallInt(_) => 1 + 2,
Value::Int(_) | Value::Date(_) => 1 + 4,
Value::BigInt(_)
| Value::Float(_)
| Value::Timestamp(_)
| Value::Time(_)
| Value::Money(_) => 1 + 8,
Value::Bool(_) => 1 + 1,
Value::Year(_) => 1 + 2,
Value::Text(s) | Value::Json(s) => 1 + 4 + s.len(),
Value::Bytes(b) => 1 + 4 + b.len(),
Value::Numeric { .. } => 1 + 16 + 1,
Value::Uuid(_) => 1 + 16,
Value::TimeTz { .. } => 1 + 12,
Value::Hstore(pairs) => {
let mut n = 1 + 4;
for (k, v) in pairs {
n += 4 + k.len() + 1;
if let Some(val) = v {
n += 4 + val.len();
}
}
n
}
Value::IntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
1 + 8 + rows.len() * cols * (1 + 4)
}
Value::BigIntArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
1 + 8 + rows.len() * cols * (1 + 8)
}
Value::TextArray2D(rows) => {
let cols = rows.first().map(|r| r.len()).unwrap_or(0);
let mut n = 1 + 8 + rows.len() * cols;
for row in rows {
for s in row.iter().flatten() {
n += 4 + s.len();
}
}
n
}
other => {
let ty = other.data_type().unwrap_or(DataType::Int);
1 + value_body_encoded_len(other, ty)
}
}
}
fn write_value(out: &mut Vec<u8>, v: &Value) {
match v {
Value::Null => out.push(0),
Value::SmallInt(n) => {
out.push(7);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Int(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
Value::BigInt(n) => {
out.push(2);
out.extend_from_slice(&n.to_le_bytes());
}
Value::Float(x) => {
out.push(3);
out.extend_from_slice(&x.to_le_bytes());
}
Value::Text(s) | Value::Json(s) => {
out.push(4);
write_str(out, s);
}
Value::Bool(b) => {
out.push(5);
out.push(u8::from(*b));
}
Value::Vector(v) => {
out.push(6);
let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
for x in v {
out.extend_from_slice(&x.to_le_bytes());
}
}
Value::Sq8Vector(q) => {
out.push(11);
let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&q.min.to_le_bytes());
out.extend_from_slice(&q.max.to_le_bytes());
out.extend_from_slice(&q.bytes);
}
Value::HalfVector(h) => {
out.push(12);
let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
out.extend_from_slice(&dim.to_le_bytes());
out.extend_from_slice(&h.bytes);
}
Value::Numeric { scaled, scale } => {
out.push(8);
out.extend_from_slice(&scaled.to_le_bytes());
out.push(*scale);
}
Value::Date(d) => {
out.push(9);
out.extend_from_slice(&d.to_le_bytes());
}
Value::Timestamp(t) => {
out.push(10);
out.extend_from_slice(&t.to_le_bytes());
}
Value::Interval { .. } => {
unreachable!(
"Value::Interval has no on-disk encoding; engine must reject it before write"
)
}
Value::Bytes(b) => {
out.push(14);
write_bytes_escaped(out, b);
}
Value::TextArray(items) => {
out.push(15);
let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(s) => {
out.push(0);
write_bytes_escaped(out, s.as_bytes());
}
}
}
}
Value::IntArray(items) => {
out.push(16);
let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
Value::BigIntArray(items) => {
out.push(17);
let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
out.extend_from_slice(&count.to_le_bytes());
for item in items {
match item {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
Value::TsVector(lexs) => {
out.push(18);
write_tsvector_body(out, lexs);
}
Value::TsQuery(ast) => {
out.push(19);
write_tsquery_body(out, ast);
}
Value::Uuid(b) => {
out.push(20);
out.extend_from_slice(&b[..]);
}
Value::Time(us) => {
out.push(21);
out.extend_from_slice(&us.to_le_bytes());
}
Value::Year(y) => {
out.push(22);
out.extend_from_slice(&y.to_le_bytes());
}
Value::TimeTz { us, offset_secs } => {
out.push(23);
out.extend_from_slice(&us.to_le_bytes());
out.extend_from_slice(&offset_secs.to_le_bytes());
}
Value::Money(c) => {
out.push(24);
out.extend_from_slice(&c.to_le_bytes());
}
Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
} => {
out.push(25);
out.push(kind.tag());
let mut flags: u8 = 0;
if *empty {
flags |= 0b0000_0001;
}
if lower.is_some() {
flags |= 0b0000_0010;
}
if upper.is_some() {
flags |= 0b0000_0100;
}
if *lower_inc {
flags |= 0b0000_1000;
}
if *upper_inc {
flags |= 0b0001_0000;
}
out.push(flags);
if let Some(l) = lower {
write_value(out, l);
}
if let Some(u) = upper {
write_value(out, u);
}
}
Value::Hstore(pairs) => {
out.push(26);
write_hstore_body(out, pairs);
}
Value::IntArray2D(rows) => {
out.push(27);
write_int_2d_body(out, rows);
}
Value::BigIntArray2D(rows) => {
out.push(28);
write_bigint_2d_body(out, rows);
}
Value::TextArray2D(rows) => {
out.push(29);
write_text_2d_body(out, rows);
}
}
}
fn write_int_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<i32>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
}
fn write_bigint_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<i64>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(n) => {
out.push(0);
out.extend_from_slice(&n.to_le_bytes());
}
}
}
}
}
fn write_text_2d_body(out: &mut Vec<u8>, rows: &[Vec<Option<String>>]) {
let nrows = u32::try_from(rows.len()).expect("≤ 4G rows");
let ncols = u32::try_from(rows.first().map(|r| r.len()).unwrap_or(0)).expect("≤ 4G cols");
out.extend_from_slice(&nrows.to_le_bytes());
out.extend_from_slice(&ncols.to_le_bytes());
for row in rows {
for cell in row {
match cell {
None => out.push(1),
Some(s) => {
out.push(0);
let l = u32::try_from(s.len()).expect("≤ 4 GiB cell");
out.extend_from_slice(&l.to_le_bytes());
out.extend_from_slice(s.as_bytes());
}
}
}
}
}
fn write_hstore_body(out: &mut Vec<u8>, pairs: &[(String, Option<String>)]) {
let count = u32::try_from(pairs.len()).expect("hstore ≤ u32::MAX pairs");
out.extend_from_slice(&count.to_le_bytes());
for (k, v) in pairs {
let klen = u32::try_from(k.len()).expect("hstore key ≤ 4 GiB");
out.extend_from_slice(&klen.to_le_bytes());
out.extend_from_slice(k.as_bytes());
match v {
None => out.push(0),
Some(val) => {
out.push(1);
let vlen = u32::try_from(val.len()).expect("hstore val ≤ 4 GiB");
out.extend_from_slice(&vlen.to_le_bytes());
out.extend_from_slice(val.as_bytes());
}
}
}
}
fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
out.extend_from_slice(&count.to_le_bytes());
for l in lexs {
write_bytes_escaped(out, l.word.as_bytes());
let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
out.extend_from_slice(&plen.to_le_bytes());
for p in &l.positions {
out.extend_from_slice(&p.to_le_bytes());
}
out.push(l.weight);
}
}
fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
match ast {
TsQueryAst::Term { word, weight_mask } => {
out.push(0);
write_bytes_escaped(out, word.as_bytes());
out.push(*weight_mask);
}
TsQueryAst::And(a, b) => {
out.push(1);
write_tsquery_body(out, a);
write_tsquery_body(out, b);
}
TsQueryAst::Or(a, b) => {
out.push(2);
write_tsquery_body(out, a);
write_tsquery_body(out, b);
}
TsQueryAst::Not(x) => {
out.push(3);
write_tsquery_body(out, x);
}
TsQueryAst::Phrase {
left,
right,
distance,
} => {
out.push(4);
out.extend_from_slice(&distance.to_le_bytes());
write_tsquery_body(out, left);
write_tsquery_body(out, right);
}
}
}
fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
match ast {
TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
}
TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
TsQueryAst::Phrase { left, right, .. } => {
1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
}
}
}
fn write_u16(out: &mut Vec<u8>, n: u16) {
out.extend_from_slice(&n.to_le_bytes());
}
fn write_u32(out: &mut Vec<u8>, n: u32) {
out.extend_from_slice(&n.to_le_bytes());
}
const STR_LEN_ESCAPE: u16 = u16::MAX;
fn write_bytes_escaped(out: &mut Vec<u8>, b: &[u8]) {
if b.len() >= STR_LEN_ESCAPE as usize {
let len = u32::try_from(b.len()).expect("cell fits in u32 (4 GiB cap)");
write_u16(out, STR_LEN_ESCAPE);
write_u32(out, len);
} else {
write_u16(out, b.len() as u16);
}
out.extend_from_slice(b);
}
fn write_str(out: &mut Vec<u8>, s: &str) {
if s.len() >= STR_LEN_ESCAPE as usize {
let len = u32::try_from(s.len()).expect("text fits in u32 (4 GiB cap)");
write_u16(out, STR_LEN_ESCAPE);
write_u32(out, len);
} else {
write_u16(out, s.len() as u16);
}
out.extend_from_slice(s.as_bytes());
}
fn write_str_long(out: &mut Vec<u8>, s: &str) {
let len = u32::try_from(s.len()).expect("function body fits in u32");
write_u32(out, len);
out.extend_from_slice(s.as_bytes());
}
fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
match key {
IndexKey::Int(n) => {
out.push(INDEX_KEY_TAG_INT);
out.extend_from_slice(&n.to_le_bytes());
}
IndexKey::Text(s) => {
out.push(INDEX_KEY_TAG_TEXT);
write_str(out, s);
}
IndexKey::Bool(b) => {
out.push(INDEX_KEY_TAG_BOOL);
out.push(u8::from(*b));
}
IndexKey::Uuid(b) => {
out.push(INDEX_KEY_TAG_UUID);
out.extend_from_slice(&b[..]);
}
}
}
struct Cursor<'a> {
buf: &'a [u8],
pos: usize,
codec_version: u8,
}
impl<'a> Cursor<'a> {
const fn new(buf: &'a [u8]) -> Self {
Self {
buf,
pos: 0,
codec_version: 0,
}
}
const fn with_codec_version(mut self, v: u8) -> Self {
self.codec_version = v;
self
}
fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
let end = self
.pos
.checked_add(n)
.ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
if end > self.buf.len() {
return Err(StorageError::Corrupt(format!(
"unexpected EOF at offset {} (wanted {n} more bytes)",
self.pos
)));
}
let s = &self.buf[self.pos..end];
self.pos = end;
Ok(s)
}
fn read_u8(&mut self) -> Result<u8, StorageError> {
Ok(self.take(1)?[0])
}
fn read_u16(&mut self) -> Result<u16, StorageError> {
let s = self.take(2)?;
Ok(u16::from_le_bytes([s[0], s[1]]))
}
fn read_u32(&mut self) -> Result<u32, StorageError> {
let s = self.take(4)?;
Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_i32(&mut self) -> Result<i32, StorageError> {
let s = self.take(4)?;
Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_u64(&mut self) -> Result<u64, StorageError> {
let s = self.take(8)?;
Ok(u64::from_le_bytes([
s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
]))
}
fn read_i64(&mut self) -> Result<i64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(i64::from_le_bytes(arr))
}
fn read_f64(&mut self) -> Result<f64, StorageError> {
let s = self.take(8)?;
let arr: [u8; 8] = s.try_into().expect("checked");
Ok(f64::from_le_bytes(arr))
}
fn read_f32(&mut self) -> Result<f32, StorageError> {
let s = self.take(4)?;
Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
}
fn read_len_escaped_v47(&mut self) -> Result<usize, StorageError> {
let short = self.read_u16()?;
if self.codec_version >= 47 && short == STR_LEN_ESCAPE {
Ok(self.read_u32()? as usize)
} else {
Ok(short as usize)
}
}
fn read_str_escaped_v47(&mut self) -> Result<String, StorageError> {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in cell payload".into()))
}
fn read_str(&mut self) -> Result<String, StorageError> {
let short = self.read_u16()?;
let len = if self.codec_version >= 46 && short == STR_LEN_ESCAPE {
self.read_u32()? as usize
} else {
short as usize
};
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
}
fn read_str_long(&mut self) -> Result<String, StorageError> {
let len = self.read_u32()? as usize;
let bytes = self.take(len)?;
core::str::from_utf8(bytes)
.map(String::from)
.map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
}
fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
let tag = self.read_u8()?;
match tag {
INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
INDEX_KEY_TAG_UUID => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(IndexKey::Uuid(b))
}
other => Err(StorageError::Corrupt(format!(
"unknown index key tag: {other}"
))),
}
}
fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
match ty {
DataType::SmallInt => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
DataType::Int => Ok(Value::Int(self.read_i32()?)),
DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
DataType::Float => Ok(Value::Float(self.read_f64()?)),
DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
Ok(Value::Text(self.read_str()?))
}
DataType::Vector {
encoding: VecEncoding::F32,
..
} => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
DataType::Vector {
encoding: VecEncoding::Sq8,
..
} => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
DataType::Vector {
encoding: VecEncoding::F16,
..
} => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
DataType::Numeric { .. } => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
DataType::Date => Ok(Value::Date(self.read_i32()?)),
DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
DataType::Interval => {
Err(StorageError::Corrupt(
"INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
))
}
DataType::Json => Ok(Value::Json(self.read_str()?)),
DataType::Bytes => {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?.to_vec();
Ok(Value::Bytes(bytes))
}
DataType::TextArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<String>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_str_escaped_v47()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"TEXT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::TextArray(items))
}
DataType::IntArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i32()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"INT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::IntArray(items))
}
DataType::BigIntArray => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i64()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"BIGINT[] null flag: unknown byte {other}"
)));
}
}
}
Ok(Value::BigIntArray(items))
}
DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
DataType::Uuid => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(Value::Uuid(b))
}
DataType::Time => Ok(Value::Time(self.read_i64()?)),
DataType::Year => Ok(Value::Year(self.read_u16()?)),
DataType::TimeTz => {
let us = self.read_i64()?;
let offset_secs = self.read_i32()?;
Ok(Value::TimeTz { us, offset_secs })
}
DataType::Money => Ok(Value::Money(self.read_i64()?)),
DataType::Hstore => Ok(Value::Hstore(self.read_hstore_body()?)),
DataType::IntArray2D => Ok(Value::IntArray2D(self.read_int_2d_body()?)),
DataType::BigIntArray2D => Ok(Value::BigIntArray2D(self.read_bigint_2d_body()?)),
DataType::TextArray2D => Ok(Value::TextArray2D(self.read_text_2d_body()?)),
DataType::Range(kind) => {
let flags = self.read_u8()?;
let empty = flags & 0b0000_0001 != 0;
let has_lower = flags & 0b0000_0010 != 0;
let has_upper = flags & 0b0000_0100 != 0;
let lower_inc = flags & 0b0000_1000 != 0;
let upper_inc = flags & 0b0001_0000 != 0;
let lower = if has_lower {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
let upper = if has_upper {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
Ok(Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
})
}
}
}
fn read_int_2d_body(&mut self) -> Result<Vec<Vec<Option<i32>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
row.push(if null == 1 {
None
} else {
Some(self.read_i32()?)
});
}
rows.push(row);
}
Ok(rows)
}
fn read_bigint_2d_body(&mut self) -> Result<Vec<Vec<Option<i64>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
row.push(if null == 1 {
None
} else {
Some(self.read_i64()?)
});
}
rows.push(row);
}
Ok(rows)
}
fn read_text_2d_body(&mut self) -> Result<Vec<Vec<Option<String>>>, StorageError> {
let nrows = self.read_u32()? as usize;
let ncols = self.read_u32()? as usize;
let mut rows = Vec::with_capacity(nrows);
for _ in 0..nrows {
let mut row = Vec::with_capacity(ncols);
for _ in 0..ncols {
let null = self.read_u8()?;
if null == 1 {
row.push(None);
} else {
let l = self.read_u32()? as usize;
let bytes = self.take(l)?.to_vec();
let s = String::from_utf8(bytes).map_err(|_| {
StorageError::Corrupt("2D TEXT cell is not valid UTF-8".into())
})?;
row.push(Some(s));
}
}
rows.push(row);
}
Ok(rows)
}
fn read_hstore_body(&mut self) -> Result<Vec<(String, Option<String>)>, StorageError> {
let count = self.read_u32()? as usize;
let mut out = Vec::with_capacity(count);
for _ in 0..count {
let klen = self.read_u32()? as usize;
let k_bytes = self.take(klen)?.to_vec();
let k = String::from_utf8(k_bytes)
.map_err(|_| StorageError::Corrupt("hstore key is not valid UTF-8".into()))?;
let has_val = self.read_u8()? != 0;
let v =
if has_val {
let vlen = self.read_u32()? as usize;
let v_bytes = self.take(vlen)?.to_vec();
Some(String::from_utf8(v_bytes).map_err(|_| {
StorageError::Corrupt("hstore value is not valid UTF-8".into())
})?)
} else {
None
};
out.push((k, v));
}
Ok(out)
}
fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
let count = self.read_u16()? as usize;
let mut out = Vec::with_capacity(count);
for _ in 0..count {
let word = self.read_str_escaped_v47()?;
let pos_count = self.read_u16()? as usize;
let mut positions = Vec::with_capacity(pos_count);
for _ in 0..pos_count {
positions.push(self.read_u16()?);
}
let weight = self.read_u8()?;
out.push(TsLexeme {
word,
positions,
weight,
});
}
Ok(out)
}
fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
let tag = self.read_u8()?;
match tag {
0 => {
let word = self.read_str_escaped_v47()?;
let weight_mask = self.read_u8()?;
Ok(TsQueryAst::Term { word, weight_mask })
}
1 => {
let a = self.read_tsquery_body()?;
let b = self.read_tsquery_body()?;
Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
}
2 => {
let a = self.read_tsquery_body()?;
let b = self.read_tsquery_body()?;
Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
}
3 => {
let x = self.read_tsquery_body()?;
Ok(TsQueryAst::Not(Box::new(x)))
}
4 => {
let distance = self.read_u16()?;
let left = self.read_tsquery_body()?;
let right = self.read_tsquery_body()?;
Ok(TsQueryAst::Phrase {
left: Box::new(left),
right: Box::new(right),
distance,
})
}
other => Err(StorageError::Corrupt(format!(
"tsquery: unknown node tag {other}"
))),
}
}
fn read_value(&mut self) -> Result<Value, StorageError> {
let tag = self.read_u8()?;
match tag {
0 => Ok(Value::Null),
1 => Ok(Value::Int(self.read_i32()?)),
2 => Ok(Value::BigInt(self.read_i64()?)),
3 => Ok(Value::Float(self.read_f64()?)),
4 => Ok(Value::Text(self.read_str()?)),
5 => Ok(Value::Bool(self.read_u8()? != 0)),
6 => {
let dim = self.read_u32()? as usize;
let mut v = Vec::with_capacity(dim);
for _ in 0..dim {
let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
v.push(f32::from_le_bytes(bytes));
}
Ok(Value::Vector(v))
}
7 => {
let s = self.take(2)?;
Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
}
8 => {
let s = self.take(16)?;
let arr: [u8; 16] = s.try_into().expect("checked");
let scaled = i128::from_le_bytes(arr);
let scale = self.read_u8()?;
Ok(Value::Numeric { scaled, scale })
}
9 => Ok(Value::Date(self.read_i32()?)),
10 => Ok(Value::Timestamp(self.read_i64()?)),
11 => {
let dim = self.read_u32()? as usize;
let min = self.read_f32()?;
let max = self.read_f32()?;
let bytes = self.take(dim)?.to_vec();
Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
}
12 => {
let dim = self.read_u32()? as usize;
let bytes = self.take(dim * 2)?.to_vec();
Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
}
14 => {
let len = self.read_len_escaped_v47()?;
let bytes = self.take(len)?.to_vec();
Ok(Value::Bytes(bytes))
}
15 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<String>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_str_escaped_v47()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"TEXT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::TextArray(items))
}
16 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i32()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"INT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::IntArray(items))
}
17 => {
let count = self.read_u16()? as usize;
let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
for _ in 0..count {
match self.read_u8()? {
0 => items.push(Some(self.read_i64()?)),
1 => items.push(None),
other => {
return Err(StorageError::Corrupt(format!(
"BIGINT[] null flag in value tag: unknown byte {other}"
)));
}
}
}
Ok(Value::BigIntArray(items))
}
18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
20 => {
let s = self.take(16)?;
let mut b = [0u8; 16];
b.copy_from_slice(s);
Ok(Value::Uuid(b))
}
21 => Ok(Value::Time(self.read_i64()?)),
22 => Ok(Value::Year(self.read_u16()?)),
23 => {
let us = self.read_i64()?;
let offset_secs = self.read_i32()?;
Ok(Value::TimeTz { us, offset_secs })
}
24 => Ok(Value::Money(self.read_i64()?)),
26 => Ok(Value::Hstore(self.read_hstore_body()?)),
27 => Ok(Value::IntArray2D(self.read_int_2d_body()?)),
28 => Ok(Value::BigIntArray2D(self.read_bigint_2d_body()?)),
29 => Ok(Value::TextArray2D(self.read_text_2d_body()?)),
25 => {
let kt = self.read_u8()?;
let kind = RangeKind::from_tag(kt)
.ok_or_else(|| StorageError::Corrupt(format!("unknown RangeKind tag: {kt}")))?;
let flags = self.read_u8()?;
let empty = flags & 0b0000_0001 != 0;
let has_lower = flags & 0b0000_0010 != 0;
let has_upper = flags & 0b0000_0100 != 0;
let lower_inc = flags & 0b0000_1000 != 0;
let upper_inc = flags & 0b0001_0000 != 0;
let lower = if has_lower {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
let upper = if has_upper {
Some(alloc::boxed::Box::new(self.read_value()?))
} else {
None
};
Ok(Value::Range {
kind,
lower,
upper,
lower_inc,
upper_inc,
empty,
})
}
other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
}
}
fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
let m_max_0 = self.read_u16()? as usize;
let entry_raw = self.read_u32()?;
let entry = if entry_raw == u32::MAX {
None
} else {
Some(entry_raw as usize)
};
let entry_level = self.read_u8()?;
let node_count = self.read_u32()? as usize;
let mut levels: PersistentVec<u8> = PersistentVec::new();
for _ in 0..node_count {
levels.push_mut(self.read_u8()?);
}
let layer_count = self.read_u8()? as usize;
let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
for _ in 0..layer_count {
let n = self.read_u32()? as usize;
let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
for _ in 0..n {
let cnt = self.read_u16()? as usize;
let mut row: Vec<u32> = Vec::with_capacity(cnt);
for _ in 0..cnt {
row.push(self.read_u32()?);
}
per_layer.push_mut(row);
}
layers.push(per_layer);
}
Ok(NswGraph {
m,
m_max_0,
entry,
entry_level,
levels,
layers,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::ToString;
use alloc::vec;
#[test]
fn snapshot_round_trips_large_bytea_and_text_array_element() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"q",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("data", DataType::Bytes, true),
ColumnSchema::new("uris", DataType::TextArray, true),
],
))
.unwrap();
let big_blob = alloc::vec![0xAB_u8; 200_000];
let big_elem = "u".repeat(100_000);
cat.get_mut("q")
.unwrap()
.insert(Row::new(alloc::vec![
Value::BigInt(1),
Value::Bytes(big_blob.clone()),
Value::TextArray(alloc::vec![Some(big_elem.clone()), None, Some("s".into())]),
]))
.unwrap();
let bytes = cat.serialize();
let re = Catalog::deserialize(&bytes).unwrap();
let row = re.get("q").unwrap().rows.get(0).unwrap().clone();
match &row.values[1] {
Value::Bytes(b) => assert_eq!(b.len(), big_blob.len()),
other => panic!("expected Bytes, got {other:?}"),
}
match &row.values[2] {
Value::TextArray(items) => {
assert_eq!(items[0].as_ref().unwrap().len(), big_elem.len());
assert!(items[1].is_none());
}
other => panic!("expected TextArray, got {other:?}"),
}
}
#[test]
fn plain_u16_bytea_len_ffff_decodes_under_v46_rules() {
let payload = alloc::vec![7_u8; 65_535];
let mut buf = Vec::new();
write_u16(&mut buf, 65_535);
buf.extend_from_slice(&payload);
let mut cur = Cursor::new(&buf).with_codec_version(46);
let len = cur.read_len_escaped_v47().unwrap();
assert_eq!(len, 65_535);
assert_eq!(cur.take(len).unwrap().len(), 65_535);
}
#[test]
fn escaped_string_codec_round_trips_large_text() {
for len in [0usize, 1, 65_534, 65_535, 65_536, 1_048_576] {
let s: String = "x".repeat(len);
let mut buf = Vec::new();
write_str(&mut buf, &s);
let expected_header = if len >= STR_LEN_ESCAPE as usize { 6 } else { 2 };
assert_eq!(buf.len(), expected_header + len, "header width for {len}");
let mut cur = Cursor::new(&buf).with_codec_version(FILE_VERSION);
assert_eq!(cur.read_str().unwrap().len(), len, "round-trip {len}");
}
}
#[test]
fn plain_u16_len_ffff_decodes_under_old_rules() {
let s = "y".repeat(65_535);
let mut buf = Vec::new();
write_u16(&mut buf, 65_535);
buf.extend_from_slice(s.as_bytes());
let mut old = Cursor::new(&buf); assert_eq!(old.read_str().unwrap(), s);
}
#[test]
fn snapshot_round_trips_megabyte_text_row() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"mail",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("body", DataType::Text, false),
],
))
.unwrap();
let body = "m".repeat(1_048_576);
cat.get_mut("mail")
.unwrap()
.insert(Row::new(vec![Value::BigInt(1), Value::Text(body.clone())]))
.unwrap();
let bytes = cat.serialize();
let re = Catalog::deserialize(&bytes).unwrap();
let t = re.get("mail").unwrap();
match &t.rows.get(0).unwrap().values[1] {
Value::Text(s) => assert_eq!(s.len(), body.len()),
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn segment_v3_round_trips_large_text_rows() {
let schema = TableSchema::new(
"mail",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("body", DataType::Text, false),
],
);
let big = "b".repeat(200_000);
let rows: Vec<(u64, Vec<u8>)> = (0u64..3)
.map(|i| {
let row = Row::new(vec![
Value::BigInt(i.cast_signed()),
Value::Text(big.clone()),
]);
(i, encode_row_body_dense(&row, &schema))
})
.collect();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
assert_eq!(&bytes[..8], b"SPGSEG\x04\x00", "new segments are V4");
let seg = OwnedSegment::from_bytes(bytes).unwrap();
assert!(seg.codec_version() >= 47);
let payload = seg.lookup(1).expect("pk 1 present");
let (row, _) = decode_row_body_dense(&payload, &schema, seg.codec_version()).unwrap();
match &row.values[1] {
Value::Text(s) => assert_eq!(s.len(), big.len()),
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn index_key_round_trips_large_text() {
let key = IndexKey::Text("k".repeat(100_000));
let mut buf = Vec::new();
write_index_key(&mut buf, &key);
let mut cur = Cursor::new(&buf).with_codec_version(FILE_VERSION);
let back = cur.read_index_key().unwrap();
assert_eq!(back, key);
}
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_l2_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = l2_distance_sq_scalar(&a, &b);
let neon = unsafe { l2_distance_sq_neon(&a, &b) };
let tol = (scalar.abs().max(1e-6)) * 1e-4;
assert!(
(scalar - neon).abs() <= tol,
"dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_inner_product_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = inner_product_scalar(&a, &b);
let neon = unsafe { inner_product_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(scalar - neon).abs() <= tol,
"IP dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[allow(clippy::similar_names)]
#[test]
fn neon_cosine_dot_norms_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
#[allow(clippy::cast_precision_loss)]
let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(dot_s - dot_n).abs() <= tol_d,
"cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
);
assert!(
(na_s - na_n).abs() <= tol_n,
"cosine na dim={d}: scalar={na_s} neon={na_n}"
);
assert!(
(nb_s - nb_n).abs() <= tol_n,
"cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
);
}
}
fn make_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("score", DataType::Float, true),
],
)
}
#[test]
fn value_type_tag_matches_variant() {
assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
assert_eq!(Value::Null.data_type(), None);
assert!(Value::Null.is_null());
assert!(!Value::Int(0).is_null());
}
#[test]
fn sq8_value_reports_sq8_data_type() {
let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
let v = Value::Sq8Vector(q);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 5,
encoding: VecEncoding::Sq8,
}),
);
}
#[test]
fn datatype_display_matches_pg_keyword() {
assert_eq!(DataType::Int.to_string(), "INT");
assert_eq!(DataType::BigInt.to_string(), "BIGINT");
assert_eq!(DataType::Float.to_string(), "FLOAT");
assert_eq!(DataType::Text.to_string(), "TEXT");
assert_eq!(DataType::Bool.to_string(), "BOOL");
}
#[test]
fn row_len_and_emptiness() {
let r = Row::new(vec![Value::Int(1), Value::Null]);
assert_eq!(r.len(), 2);
assert!(!r.is_empty());
assert!(Row::new(Vec::new()).is_empty());
}
#[test]
fn table_schema_column_position() {
let s = make_users_schema();
assert_eq!(s.column_position("id"), Some(0));
assert_eq!(s.column_position("score"), Some(2));
assert_eq!(s.column_position("missing"), None);
}
#[test]
fn catalog_create_table_then_lookup() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_eq!(cat.table_count(), 1);
assert!(cat.get("users").is_some());
assert!(cat.get("nope").is_none());
}
#[test]
fn catalog_duplicate_table_is_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let err = cat.create_table(make_users_schema()).unwrap_err();
assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
}
#[test]
fn table_insert_happy_path_appends_row() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(99.5),
]))
.unwrap();
assert_eq!(t.row_count(), 1);
assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
}
#[test]
fn table_insert_arity_mismatch() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
assert!(matches!(
err,
StorageError::ArityMismatch {
expected: 3,
actual: 1
}
));
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_type_mismatch_reports_column() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Int(42), Value::Float(0.0),
]))
.unwrap_err();
match err {
StorageError::TypeMismatch {
ref column,
expected,
actual,
position,
} => {
assert_eq!(column, "name");
assert_eq!(expected, DataType::Text);
assert_eq!(actual, DataType::Int);
assert_eq!(position, 1);
}
other => panic!("unexpected: {other:?}"),
}
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_null_into_not_null_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Null, Value::Float(1.0),
]))
.unwrap_err();
assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
}
#[test]
fn table_insert_null_into_nullable_ok() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_eq!(t.row_count(), 1);
}
#[test]
fn catalog_get_mut_independent_per_table() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"a",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.create_table(TableSchema::new(
"b",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.get_mut("a")
.unwrap()
.insert(Row::new(vec![Value::Int(1)]))
.unwrap();
assert_eq!(cat.get("a").unwrap().row_count(), 1);
assert_eq!(cat.get("b").unwrap().row_count(), 0);
}
fn assert_round_trip(cat: &Catalog) {
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
assert_eq!(restored.table_count(), cat.table_count());
for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
assert_eq!(a.schema, b.schema);
assert_eq!(a.rows, b.rows);
}
}
#[test]
fn serialize_empty_catalog_round_trips() {
assert_round_trip(&Catalog::new());
}
#[test]
fn serialize_single_empty_table_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_round_trip(&cat);
}
#[test]
fn nsw_clone_is_o1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..1500_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]))
.unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let g = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
assert!(
g.layers.len() >= 2,
"1500 nodes should populate at least two HNSW layers, got {}",
g.layers.len()
);
let cloned = g.clone();
assert!(
g.levels.shares_storage_with(&cloned.levels),
"levels PV not shared after clone — clone copied elements (O(N))"
);
assert_eq!(g.layers.len(), cloned.layers.len());
for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
assert!(
orig.shares_storage_with(cl),
"layer {l} PV not shared after clone — clone copied elements (O(N))"
);
}
}
#[test]
fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Sq8Vector(quantize::quantize(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
use crate::halfvec;
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
#[allow(clippy::similar_names)]
fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
use crate::halfvec;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xF16_F16_F16_F16_u64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check halfvec dispatch in `cell_to_query_metric_distance`"
);
}
#[test]
fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
use crate::quantize;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::Sq8Vector(quantize::quantize(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
);
}
#[test]
fn nsw_index_topology_persists_through_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..6_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
let row = Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]);
t.insert(row).unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let original = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
assert_eq!(restored_graph.m, original.m);
assert_eq!(restored_graph.m_max_0, original.m_max_0);
assert_eq!(restored_graph.entry, original.entry);
assert_eq!(restored_graph.entry_level, original.entry_level);
assert_eq!(restored_graph.levels, original.levels);
assert_eq!(restored_graph.layers, original.layers);
}
#[test]
fn hnsw_level_assignment_is_deterministic() {
for i in 0..32usize {
assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
}
}
#[test]
fn hnsw_layer_0_dominates_population() {
let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
}
#[test]
fn hnsw_search_matches_brute_force_for_l2_top1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
(1, [0.0, 0.0, 0.0]),
(2, [1.0, 0.0, 0.0]),
(3, [0.0, 1.0, 0.0]),
(4, [0.0, 0.0, 1.0]),
(5, [1.0, 1.0, 0.0]),
(6, [1.0, 0.0, 1.0]),
(7, [0.0, 1.0, 1.0]),
(8, [1.0, 1.0, 1.0]),
(9, [0.5, 0.5, 0.5]),
(10, [0.2, 0.8, 0.5]),
];
for &(id, v) in &dataset {
t.insert(Row::new(alloc::vec![
Value::Int(id),
Value::Vector(alloc::vec![v[0], v[1], v[2]]),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let idx_pos = cat
.get("vecs")
.unwrap()
.indices()
.iter()
.position(|i| i.name == "v_idx")
.unwrap();
for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
let table = cat.get("vecs").unwrap();
let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
.map(|i| {
let Value::Vector(v) = &table.rows[i].values[1] else {
return (f32::INFINITY, i);
};
(l2_distance_sq(v, &query), i)
})
.collect();
brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
assert!(!hnsw_top.is_empty(), "HNSW returned no results");
assert_eq!(
hnsw_top[0].1, brute[0].1,
"HNSW top-1 != brute-force top-1 for {query:?}"
);
}
}
#[test]
fn serialize_table_with_rows_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(95.5),
]))
.unwrap();
t.insert(Row::new(vec![
Value::Int(2),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn serialize_multiple_tables_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
cat.create_table(TableSchema::new(
"flags",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("active", DataType::Bool, false),
],
))
.unwrap();
cat.get_mut("flags")
.unwrap()
.insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn deserialize_rejects_bad_magic() {
let mut buf = b"BADMAGIC".to_vec();
buf.push(FILE_VERSION);
buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(_)));
}
#[test]
fn deserialize_rejects_unsupported_version() {
let mut buf = FILE_MAGIC.to_vec();
buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
}
#[test]
fn deserialize_rejects_truncated_file() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let bytes = cat.serialize();
let truncated = &bytes[..bytes.len() - 1];
assert!(matches!(
Catalog::deserialize(truncated),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn deserialize_rejects_trailing_garbage() {
let cat = Catalog::new();
let mut bytes = cat.serialize();
bytes.push(0xFF);
assert!(matches!(
Catalog::deserialize(&bytes),
Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
));
}
fn populated_users() -> Catalog {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name, score) in [
(1, "alice", Some(90.0)),
(2, "bob", None),
(3, "alice", Some(70.0)), ] {
t.insert(Row::new(vec![
Value::Int(id),
Value::Text(name.into()),
score.map_or(Value::Null, Value::Float),
]))
.unwrap();
}
cat
}
#[test]
fn add_index_builds_from_existing_rows() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_id".into(), "id")
.unwrap();
let t = cat.get("users").unwrap();
let idx = t.index_on(0).expect("index_on(0)");
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
}
#[test]
fn add_index_dup_name_rejected() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("ix".into(), "id").unwrap();
let err = t.add_index("ix".into(), "name").unwrap_err();
assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
}
#[test]
fn add_index_unknown_column_rejected() {
let mut cat = populated_users();
let err = cat
.get_mut("users")
.unwrap()
.add_index("ix".into(), "ghost")
.unwrap_err();
assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
}
#[test]
fn insert_after_create_index_updates_it() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_name".into(), "name").unwrap();
t.insert(Row::new(vec![
Value::Int(4),
Value::Text("dave".into()),
Value::Null,
]))
.unwrap();
let idx = t.index_on(1).unwrap();
assert_eq!(
idx.lookup_eq(&IndexKey::Text("dave".into())),
&[RowLocator::Hot(3)]
);
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
#[test]
fn null_or_float_values_are_not_indexed() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_score".into(), "score").unwrap();
let idx = t.index_on(2).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
}
#[test]
fn vector_value_data_type_carries_dim() {
let v = Value::Vector(vec![1.0, 2.0, 3.0]);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
})
);
}
#[test]
fn vector_column_insert_matching_dim_ok() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
.unwrap();
}
#[test]
fn vector_column_insert_dim_mismatch_rejected() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
let err = cat
.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
.unwrap_err();
assert!(matches!(err, StorageError::TypeMismatch { .. }));
}
#[test]
fn vector_value_survives_catalog_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
]))
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
let table = restored.get("emb").unwrap();
assert_eq!(
table.schema().columns[1].ty,
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32
}
);
assert_eq!(
table.rows()[0].values[1],
Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
);
}
#[test]
fn index_survives_serialize_deserialize_round_trip() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(idx.name, "by_name");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
fn bigint_pk_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("name", DataType::Text, false),
],
)
}
fn make_user_row(id: i64, name: &str) -> Row {
Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
}
#[test]
fn update_row_non_indexed_column_keeps_index_intact() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(2), Value::Text("bobby".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert_eq!(
idx.lookup_eq(&IndexKey::Int(2)),
&[RowLocator::Hot(1)],
"old key still resolves the in-place position"
);
assert_eq!(t.rows()[1].values[1], Value::Text("bobby".into()));
}
#[test]
fn update_row_indexed_column_moves_entry() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(20), Value::Text("bob".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert!(
idx.lookup_eq(&IndexKey::Int(2)).is_empty(),
"old key entry removed"
);
assert_eq!(
idx.lookup_eq(&IndexKey::Int(20)),
&[RowLocator::Hot(1)],
"new key entry resolves the position"
);
assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(3)), &[RowLocator::Hot(2)]);
}
#[test]
fn update_row_duplicate_key_moves_only_target_position() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(7i64, "a"), (7, "b"), (9, "c")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(8), Value::Text("b".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(7)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(8)), &[RowLocator::Hot(1)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(9)), &[RowLocator::Hot(2)]);
}
#[test]
fn update_row_null_transition_on_indexed_nullable_column() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"n",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("tag", DataType::BigInt, true),
],
))
.unwrap();
let t = cat.get_mut("n").unwrap();
t.insert(Row::new(vec![Value::BigInt(1), Value::BigInt(5)]))
.unwrap();
t.add_index("by_tag".into(), "tag").unwrap();
t.update_row(0, vec![Value::BigInt(1), Value::Null])
.unwrap();
let idx = t.index_on(1).unwrap();
assert!(idx.lookup_eq(&IndexKey::Int(5)).is_empty());
t.update_row(0, vec![Value::BigInt(1), Value::BigInt(6)])
.unwrap();
let idx = t.index_on(1).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(6)), &[RowLocator::Hot(0)]);
}
#[test]
fn lookup_by_pk_finds_row_via_hot_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap();
assert_eq!(got, make_user_row(2, "bob"));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn lookup_by_pk_returns_none_when_key_missing() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
assert!(
cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
.is_none()
);
assert!(
cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
.is_none()
);
}
#[test]
fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> =
vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _meta) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(seg_id, 0);
assert_eq!(cat.cold_segment_count(), 1);
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
let registered = cat
.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(registered, 4);
for (id, name) in &cold_rows {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap_or_else(|| panic!("cold key {id} not found"));
assert_eq!(got, make_user_row(*id, name));
}
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
}
#[test]
fn lookup_by_pk_mixes_hot_and_cold_tiers() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.unwrap(),
make_user_row(1, "alice")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
.unwrap(),
make_user_row(100, "ivy")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
.unwrap(),
make_user_row(200, "joe")
);
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
.is_none()
);
}
#[test]
fn register_cold_locators_rejects_nsw_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
]))
.unwrap();
t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
let err = t
.register_cold_locators(
"by_v",
vec![(
IndexKey::Int(1),
RowLocator::Cold {
segment_id: 0,
page_offset: 0,
},
)],
)
.unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
}
#[test]
fn load_segment_bytes_rejects_garbage() {
let mut cat = Catalog::new();
let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn load_segment_bytes_returns_sequential_ids() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let schema = cat.get("users").unwrap().schema.clone();
for batch in 0u32..3 {
let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
.map(|i| {
let id = u64::from(batch) * 100 + i;
let row = make_user_row(id.cast_signed(), "x");
(id, encode_row_body_dense(&row, &schema))
})
.collect();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
}
assert_eq!(cat.cold_segment_count(), 3);
}
#[test]
fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let v8_bytes = encode_as_v8(&cat);
assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
}
}
fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(8u8);
write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
for t in &cat.tables {
write_str(&mut out, &t.schema.name);
write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
match &idx.kind {
IndexKind::BTree(_) => out.push(0),
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).unwrap());
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { .. } => panic!(
"v8 catalog writer cannot serialise BRIN — \
tests with BRIN indices must use the current writer"
),
IndexKind::Gin(_) => panic!(
"v8 catalog writer cannot serialise GIN — \
tests with GIN indices must use the current writer"
),
IndexKind::GinTrgm(_) => panic!(
"v8 catalog writer cannot serialise trigram-GIN — \
tests with trgm indices must use the current writer"
),
IndexKind::GinFulltext(_) => panic!(
"v8 catalog writer cannot serialise fulltext-GIN — \
tests with FULLTEXT KEY must use the current writer"
),
}
}
}
out
}
#[test]
fn v9_catalog_round_trip_preserves_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
let bytes = cat.serialize();
assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(restored_seg_id, seg_id);
let idx = restored.get("users").unwrap().index_on(0).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
for (id, _) in &cold_rows {
assert_eq!(
idx.lookup_eq(&IndexKey::Int(*id)),
&[RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
}]
);
}
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
for (id, name) in &cold_rows {
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap(),
make_user_row(*id, name)
);
}
}
#[test]
fn row_body_encoded_len_matches_actual_encode_for_all_types() {
let schema = TableSchema::new(
"wide",
vec![
ColumnSchema::new("a", DataType::SmallInt, true),
ColumnSchema::new("b", DataType::Int, false),
ColumnSchema::new("c", DataType::BigInt, false),
ColumnSchema::new("d", DataType::Float, false),
ColumnSchema::new("e", DataType::Bool, false),
ColumnSchema::new("f", DataType::Text, false),
ColumnSchema::new(
"g",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
),
ColumnSchema::new(
"h",
DataType::Numeric {
precision: 18,
scale: 2,
},
false,
),
ColumnSchema::new("i", DataType::Date, false),
ColumnSchema::new("j", DataType::Timestamp, false),
],
);
let cases: &[Row] = &[
Row::new(vec![
Value::SmallInt(7),
Value::Int(42),
Value::BigInt(1_000_000),
Value::Float(1.5),
Value::Bool(true),
Value::Text("hello".into()),
Value::Vector(vec![1.0, 2.0, 3.0]),
Value::Numeric {
scaled: 12345,
scale: 2,
},
Value::Date(20_000),
Value::Timestamp(1_700_000_000_000_000),
]),
Row::new(vec![
Value::Null,
Value::Int(0),
Value::BigInt(0),
Value::Float(0.0),
Value::Bool(false),
Value::Text(String::new()),
Value::Vector(vec![]),
Value::Numeric {
scaled: 0,
scale: 2,
},
Value::Date(0),
Value::Timestamp(0),
]),
Row::new(vec![
Value::SmallInt(-1),
Value::Int(-1),
Value::BigInt(-1),
Value::Float(-0.5),
Value::Bool(true),
Value::Text("a much longer payload here".into()),
Value::Vector(vec![0.1, 0.2, 0.3]),
Value::Numeric {
scaled: -999_999_999,
scale: 2,
},
Value::Date(-1),
Value::Timestamp(-1),
]),
];
for row in cases {
let actual = encode_row_body_dense(row, &schema).len();
let fast = row_body_encoded_len(row, &schema);
assert_eq!(actual, fast, "row {row:?}");
}
}
#[test]
fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
assert_eq!(t.hot_bytes(), 0);
let mut expected: u64 = 0;
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
let row = make_user_row(id, name);
expected += encode_row_body_dense(&row, &t.schema).len() as u64;
t.insert(row).unwrap();
}
assert_eq!(t.hot_bytes(), expected);
assert_eq!(cat.hot_tier_bytes(), expected);
}
#[test]
fn hot_bytes_shrinks_on_delete() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
let before = t.hot_bytes();
let bob_row = make_user_row(2, "bob");
let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
let removed = t.delete_rows(&[1]);
assert_eq!(removed, 1);
assert_eq!(t.hot_bytes(), before - bob_bytes);
}
#[test]
fn hot_bytes_diffs_on_update_for_variable_width_columns() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
let after_insert = t.hot_bytes();
let new_row = make_user_row(1, "alice-the-longer-name");
let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
t.update_row(0, new_row.values).unwrap();
assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
}
#[test]
fn hot_bytes_round_trips_through_serialize_deserialize() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for i in 0..10 {
t.insert(make_user_row(i, &alloc::format!("name-{i}")))
.unwrap();
}
let pre = cat.hot_tier_bytes();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
assert_eq!(restored.hot_tier_bytes(), pre);
assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
}
#[test]
fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let total_bytes_before = t.hot_bytes();
let report = cat
.freeze_oldest_to_cold("users", "by_id", 6)
.expect("freeze succeeds");
assert_eq!(report.frozen_rows, 6);
assert_eq!(report.segment_id, 0);
assert!(report.bytes_freed > 0);
assert!(!report.segment_bytes.is_empty());
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(
t.hot_bytes(),
total_bytes_before - report.bytes_freed,
"hot_bytes accounting matches FreezeReport"
);
for id in 0..10i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_twice_preserves_prior_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..12i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("first freeze ok");
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("second freeze ok");
assert_eq!(cat.get("users").unwrap().row_count(), 4);
assert_eq!(cat.cold_segment_count(), 2);
for id in 0..12i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_oldest_to_cold_rejects_invalid_input() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 0),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("missing", "by_id", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "no_such_index", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 999),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.get("users").unwrap().row_count(), 3);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_oldest_to_cold_rejects_non_integer_pk() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"by_name",
vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("payload", DataType::BigInt, false),
],
))
.unwrap();
let t = cat.get_mut("by_name").unwrap();
t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
.unwrap();
t.add_index("by_n".into(), "name").unwrap();
let err = cat
.freeze_oldest_to_cold("by_name", "by_n", 1)
.expect_err("non-integer PK rejected");
match err {
StorageError::Corrupt(s) => assert!(
s.contains("non-integer"),
"error message names the constraint: {s}"
),
other => panic!("expected Corrupt, got {other:?}"),
}
assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.add_index("by_name".into(), "name").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let idx = cat.get("users").unwrap().index_on(1).unwrap();
let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
assert_eq!(got.len(), 1);
assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
match got[0] {
RowLocator::Hot(i) => {
assert_eq!(i, 1);
}
RowLocator::Cold { .. } => unreachable!(),
}
}
#[test]
fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
let new_idx = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(2))
.expect("promote ok")
.expect("PK 2 was cold");
assert_eq!(
new_idx, 2,
"promoted row appended after the 2 surviving hot rows"
);
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
let row = make_user_row(2, "u-2");
let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
assert_eq!(entries.len(), 1, "exactly one locator per key");
assert!(entries[0].is_hot(), "promote retired the Cold locator");
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
row
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
}
#[test]
fn promote_cold_row_returns_none_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(7, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
.unwrap()
.is_none()
);
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
.unwrap()
.is_none()
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..5i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_some(),
"frozen PK resolves before shadow"
);
let removed = cat
.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap();
assert_eq!(removed, 1, "exactly one cold locator retired");
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_none(),
"shadowed key no longer resolves"
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "u-2")
);
}
#[test]
fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
0,
"hot-only key drops no cold locators"
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
.unwrap(),
0,
"absent key drops no cold locators"
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
}
#[test]
fn promote_and_shadow_reject_invalid_inputs() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
let slice = b
.prepare_freeze_slice("users", "by_id", 0..6)
.expect("prepare");
let parallel = b
.commit_freeze_slices("users", "by_id", alloc::vec![slice])
.expect("commit");
assert_eq!(single.segment_id, parallel.segment_id);
assert_eq!(single.frozen_rows, parallel.frozen_rows);
assert_eq!(single.bytes_freed, parallel.bytes_freed);
assert_eq!(single.segment_bytes, parallel.segment_bytes);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after single vs slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_two_slices_match_single_slice() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a
.prepare_freeze_slice("users", "by_id", 0..8)
.expect("prepare");
let one = a
.commit_freeze_slices("users", "by_id", alloc::vec![single])
.expect("commit one");
let s1 = b
.prepare_freeze_slice("users", "by_id", 0..4)
.expect("prepare s1");
let s2 = b
.prepare_freeze_slice("users", "by_id", 4..8)
.expect("prepare s2");
let two = b
.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
.expect("commit two");
assert_eq!(one.segment_bytes, two.segment_bytes);
assert_eq!(one.frozen_rows, two.frozen_rows);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after one-slice vs two-slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_rejects_gap() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
assert!(matches!(
cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 6);
}
#[test]
fn commit_freeze_slices_empty_is_noop() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.commit_freeze_slices("users", "by_id", Vec::new())
.unwrap();
assert_eq!(report.frozen_rows, 0);
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 3);
}
#[test]
fn compact_merges_small_segments_storage_unit() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..8i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(cat.cold_segment_count(), 2);
assert_eq!(cat.cold_segment_slot_count(), 2);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let target = max_seg_bytes + 1;
let report = cat
.compact_cold_segments("users", "by_id", target)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
let merged_id = report.merged_segment_id.expect("merge happened");
assert_eq!(report.merged_rows, 6);
assert_eq!(report.deleted_rows_pruned, 0);
assert!(!report.merged_segment_bytes.is_empty());
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(cat.cold_segment_slot_count(), 3);
assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
for id in 0..8i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost after compaction"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn compact_drops_shadowed_cold_rows() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
1
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
.unwrap(),
1
);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
assert_eq!(report.deleted_rows_pruned, 2);
for shadowed in [1i64, 4i64] {
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
.is_none(),
"shadowed PK {shadowed} must remain invisible after compact"
);
}
for live in [0i64, 2, 3, 5] {
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
.unwrap_or_else(|| panic!("live PK {live} lost after compact"));
}
}
#[test]
fn compact_is_noop_below_two_candidates() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert!(report.sources.is_empty());
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
let report = cat
.compact_cold_segments("users", "by_id", 1)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
}
#[test]
fn compact_swap_survives_catalog_roundtrip_via_load_at() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact ok");
let merged_id = report.merged_segment_id.unwrap();
let cat_bytes = cat.serialize();
let merged_bytes = report.merged_segment_bytes.clone();
let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
restored
.load_segment_bytes_at(merged_id, merged_bytes)
.expect("reload merged ok");
for id in 0..6i64 {
let got = restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
assert_eq!(restored.cold_segment_count(), 1);
}
#[test]
fn load_segment_bytes_at_pads_and_rejects_collision() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let bytes_seg0 = report.segment_bytes.clone();
cat.load_segment_bytes_at(5, bytes_seg0.clone())
.expect("pad + load ok");
assert_eq!(cat.cold_segment_slot_count(), 6);
assert_eq!(cat.cold_segment_count(), 2);
assert!(matches!(
cat.load_segment_bytes_at(5, bytes_seg0.clone()),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.load_segment_bytes_at(0, bytes_seg0),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn promote_then_refreeze_does_not_leave_orphan_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let promoted = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(0))
.unwrap();
assert!(promoted.is_some());
let entries_after_promote = cat
.get("users")
.unwrap()
.index_on(0)
.unwrap()
.lookup_eq(&IndexKey::Int(0))
.to_vec();
assert_eq!(entries_after_promote.len(), 1);
assert!(entries_after_promote[0].is_hot());
for id in [2i64, 3] {
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap(),
make_user_row(id, &alloc::format!("u-{id}"))
);
}
}
}