#![no_std]
#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
extern crate alloc;
pub mod bloom;
mod codec;
pub mod fts_simple;
pub mod halfvec;
mod nsw;
pub mod persistent;
pub mod persistent_btree;
pub mod quantize;
pub mod row_locator;
pub mod segment;
mod table;
pub mod trgm;
pub use self::bloom::{BloomError, BloomFilter};
pub(crate) use self::codec::*;
pub use self::codec::{decode_row_body_dense, encode_row_body_dense, row_body_encoded_len};
pub(crate) use self::nsw::nsw_insert_at;
pub use self::nsw::{NswMetric, cosine_dot_norms_f32, inner_product_f32, nsw_index_on, nsw_query};
pub use self::row_locator::{RowLocator, RowLocatorError};
pub use self::segment::{
BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
wrap_v2_envelope_with_brin,
};
use alloc::boxed::Box;
use alloc::collections::{BTreeMap, BTreeSet};
use alloc::format;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt;
use self::persistent::PersistentVec;
use self::persistent_btree::PersistentBTreeMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VecEncoding {
#[default]
F32,
Sq8,
F16,
}
impl fmt::Display for VecEncoding {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::F32 => f.write_str("F32"),
Self::Sq8 => f.write_str("SQ8"),
Self::F16 => f.write_str("HALF"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataType {
SmallInt,
Int, BigInt, Float, Text,
Varchar(u32),
Char(u32),
Bool,
Vector {
dim: u32,
encoding: VecEncoding,
},
Numeric {
precision: u8,
scale: u8,
},
Date,
Timestamp,
Timestamptz,
Interval,
Json,
Jsonb,
Bytes,
TextArray,
IntArray,
BigIntArray,
TsVector,
TsQuery,
Uuid,
Time,
Year,
TimeTz,
Money,
Range(RangeKind),
Hstore,
IntArray2D,
BigIntArray2D,
TextArray2D,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum RangeKind {
Int4,
Int8,
Num,
Ts,
TsTz,
Date,
}
impl RangeKind {
pub const fn tag(self) -> u8 {
match self {
Self::Int4 => 0,
Self::Int8 => 1,
Self::Num => 2,
Self::Ts => 3,
Self::TsTz => 4,
Self::Date => 5,
}
}
pub const fn from_tag(t: u8) -> Option<Self> {
Some(match t {
0 => Self::Int4,
1 => Self::Int8,
2 => Self::Num,
3 => Self::Ts,
4 => Self::TsTz,
5 => Self::Date,
_ => return None,
})
}
pub const fn keyword(self) -> &'static str {
match self {
Self::Int4 => "INT4RANGE",
Self::Int8 => "INT8RANGE",
Self::Num => "NUMRANGE",
Self::Ts => "TSRANGE",
Self::TsTz => "TSTZRANGE",
Self::Date => "DATERANGE",
}
}
}
impl fmt::Display for DataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SmallInt => f.write_str("SMALLINT"),
Self::Int => f.write_str("INT"),
Self::BigInt => f.write_str("BIGINT"),
Self::Float => f.write_str("FLOAT"),
Self::Text => f.write_str("TEXT"),
Self::Varchar(n) => write!(f, "VARCHAR({n})"),
Self::Char(n) => write!(f, "CHAR({n})"),
Self::Bool => f.write_str("BOOL"),
Self::Vector { dim, encoding } => match encoding {
VecEncoding::F32 => write!(f, "VECTOR({dim})"),
VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
},
Self::Numeric { precision, scale } => {
if *scale == 0 {
write!(f, "NUMERIC({precision})")
} else {
write!(f, "NUMERIC({precision}, {scale})")
}
}
Self::Date => f.write_str("DATE"),
Self::Timestamp => f.write_str("TIMESTAMP"),
Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
Self::Interval => f.write_str("INTERVAL"),
Self::Json => f.write_str("JSON"),
Self::Jsonb => f.write_str("JSONB"),
Self::Bytes => f.write_str("BYTEA"),
Self::TextArray => f.write_str("TEXT[]"),
Self::IntArray => f.write_str("INT[]"),
Self::BigIntArray => f.write_str("BIGINT[]"),
Self::TsVector => f.write_str("TSVECTOR"),
Self::TsQuery => f.write_str("TSQUERY"),
Self::Uuid => f.write_str("UUID"),
Self::Time => f.write_str("TIME"),
Self::Year => f.write_str("YEAR"),
Self::TimeTz => f.write_str("TIMETZ"),
Self::Money => f.write_str("MONEY"),
Self::Range(k) => f.write_str(k.keyword()),
Self::Hstore => f.write_str("HSTORE"),
Self::IntArray2D => f.write_str("INT[][]"),
Self::BigIntArray2D => f.write_str("BIGINT[][]"),
Self::TextArray2D => f.write_str("TEXT[][]"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TsLexeme {
pub word: String,
pub positions: Vec<u16>,
pub weight: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TsQueryAst {
Term {
word: String,
weight_mask: u8,
},
And(Box<TsQueryAst>, Box<TsQueryAst>),
Or(Box<TsQueryAst>, Box<TsQueryAst>),
Not(Box<TsQueryAst>),
Phrase {
left: Box<TsQueryAst>,
right: Box<TsQueryAst>,
distance: u16,
},
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum Value {
SmallInt(i16),
Int(i32),
BigInt(i64),
Float(f64),
Text(String),
Bool(bool),
Vector(Vec<f32>),
Sq8Vector(crate::quantize::Sq8Vector),
HalfVector(crate::halfvec::HalfVector),
Numeric {
scaled: i128,
scale: u8,
},
Date(i32),
Timestamp(i64),
Interval {
months: i32,
micros: i64,
},
Json(String),
Bytes(Vec<u8>),
TextArray(Vec<Option<String>>),
IntArray(Vec<Option<i32>>),
BigIntArray(Vec<Option<i64>>),
TsVector(Vec<TsLexeme>),
TsQuery(TsQueryAst),
Uuid([u8; 16]),
Time(i64),
Year(u16),
TimeTz {
us: i64,
offset_secs: i32,
},
Money(i64),
Hstore(Vec<(String, Option<String>)>),
IntArray2D(Vec<Vec<Option<i32>>>),
BigIntArray2D(Vec<Vec<Option<i64>>>),
TextArray2D(Vec<Vec<Option<String>>>),
Range {
kind: RangeKind,
lower: Option<alloc::boxed::Box<Value>>,
upper: Option<alloc::boxed::Box<Value>>,
lower_inc: bool,
upper_inc: bool,
empty: bool,
},
Null,
}
impl Value {
pub fn data_type(&self) -> Option<DataType> {
match self {
Self::SmallInt(_) => Some(DataType::SmallInt),
Self::Int(_) => Some(DataType::Int),
Self::BigInt(_) => Some(DataType::BigInt),
Self::Float(_) => Some(DataType::Float),
Self::Text(_) => Some(DataType::Text),
Self::Bool(_) => Some(DataType::Bool),
Self::Vector(v) => Some(DataType::Vector {
dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F32,
}),
Self::Sq8Vector(q) => Some(DataType::Vector {
dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
encoding: VecEncoding::Sq8,
}),
Self::HalfVector(h) => Some(DataType::Vector {
dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
encoding: VecEncoding::F16,
}),
Self::Numeric { scale, .. } => Some(DataType::Numeric {
precision: 0,
scale: *scale,
}),
Self::Date(_) => Some(DataType::Date),
Self::Timestamp(_) => Some(DataType::Timestamp),
Self::Interval { .. } => Some(DataType::Interval),
Self::Json(_) => Some(DataType::Json),
Self::Bytes(_) => Some(DataType::Bytes),
Self::TextArray(_) => Some(DataType::TextArray),
Self::IntArray(_) => Some(DataType::IntArray),
Self::BigIntArray(_) => Some(DataType::BigIntArray),
Self::TsVector(_) => Some(DataType::TsVector),
Self::TsQuery(_) => Some(DataType::TsQuery),
Self::Uuid(_) => Some(DataType::Uuid),
Self::Time(_) => Some(DataType::Time),
Self::Year(_) => Some(DataType::Year),
Self::TimeTz { .. } => Some(DataType::TimeTz),
Self::Money(_) => Some(DataType::Money),
Self::Range { kind, .. } => Some(DataType::Range(*kind)),
Self::Hstore(_) => Some(DataType::Hstore),
Self::IntArray2D(_) => Some(DataType::IntArray2D),
Self::BigIntArray2D(_) => Some(DataType::BigIntArray2D),
Self::TextArray2D(_) => Some(DataType::TextArray2D),
Self::Null => None,
}
}
pub const fn is_null(&self) -> bool {
matches!(self, Self::Null)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Row {
pub values: Vec<Value>,
}
impl Row {
pub const fn new(values: Vec<Value>) -> Self {
Self { values }
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub ty: DataType,
pub nullable: bool,
pub default: Option<Value>,
pub runtime_default: Option<String>,
pub auto_increment: bool,
pub user_enum_type: Option<String>,
pub user_domain_type: Option<String>,
pub on_update_runtime: Option<String>,
pub collation: Collation,
pub is_unsigned: bool,
pub inline_enum_variants: Option<Vec<String>>,
pub inline_set_variants: Option<Vec<String>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Collation {
Binary,
CaseInsensitive,
}
#[allow(clippy::derivable_impls)]
impl Default for Collation {
fn default() -> Self {
Self::Binary
}
}
impl Collation {
pub const TAG_BINARY: u8 = 0;
pub const TAG_CASE_INSENSITIVE: u8 = 1;
}
#[derive(Debug, Clone, PartialEq)]
pub struct TableSchema {
pub name: String,
pub columns: Vec<ColumnSchema>,
pub hot_tier_bytes: Option<u64>,
pub foreign_keys: Vec<ForeignKeyConstraint>,
pub uniqueness_constraints: Vec<UniquenessConstraint>,
pub checks: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UniquenessConstraint {
pub is_primary_key: bool,
pub columns: Vec<usize>,
pub nulls_not_distinct: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForeignKeyConstraint {
pub name: Option<String>,
pub local_columns: Vec<usize>,
pub parent_table: String,
pub parent_columns: Vec<usize>,
pub on_delete: FkAction,
pub on_update: FkAction,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FkAction {
Restrict,
Cascade,
SetNull,
SetDefault,
NoAction,
}
impl FkAction {
pub const fn tag(self) -> u8 {
match self {
Self::Restrict => 0,
Self::Cascade => 1,
Self::SetNull => 2,
Self::SetDefault => 3,
Self::NoAction => 4,
}
}
pub const fn from_tag(b: u8) -> Option<Self> {
Some(match b {
0 => Self::Restrict,
1 => Self::Cascade,
2 => Self::SetNull,
3 => Self::SetDefault,
4 => Self::NoAction,
_ => return None,
})
}
}
impl TableSchema {
pub fn column_position(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c.name == name)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum IndexKey {
Int(i64),
Text(String),
Bool(bool),
Uuid([u8; 16]),
}
impl IndexKey {
pub fn from_value(v: &Value) -> Option<Self> {
match v {
Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
Value::Int(n) => Some(Self::Int(i64::from(*n))),
Value::BigInt(n) => Some(Self::Int(*n)),
Value::Text(s) => Some(Self::Text(s.clone())),
Value::Bool(b) => Some(Self::Bool(*b)),
Value::Date(d) => Some(Self::Int(i64::from(*d))),
Value::Timestamp(t) => Some(Self::Int(*t)),
Value::Uuid(b) => Some(Self::Uuid(*b)),
Value::Time(us) => Some(Self::Int(*us)),
Value::Year(y) => Some(Self::Int(i64::from(*y))),
Value::TimeTz { us, offset_secs } => {
Some(Self::Int(us - i64::from(*offset_secs) * 1_000_000))
}
Value::Money(c) => Some(Self::Int(*c)),
Value::Range { .. } => None,
Value::Hstore(_) => None,
Value::IntArray2D(_) | Value::BigIntArray2D(_) | Value::TextArray2D(_) => None,
Value::Null
| Value::Float(_)
| Value::Vector(_)
| Value::Sq8Vector(_)
| Value::HalfVector(_)
| Value::Numeric { .. }
| Value::Interval { .. }
| Value::Json(_)
| Value::Bytes(_)
| Value::TextArray(_)
| Value::IntArray(_)
| Value::BigIntArray(_)
| Value::TsVector(_)
| Value::TsQuery(_) => None,
}
}
}
#[derive(Debug, Clone)]
pub struct Index {
pub name: String,
pub column_position: usize,
pub kind: IndexKind,
pub included_columns: Vec<usize>,
pub partial_predicate: Option<String>,
pub expression: Option<String>,
pub is_unique: bool,
pub extra_column_positions: Vec<usize>,
}
pub const NSW_DEFAULT_M: usize = 16;
#[derive(Debug, Clone)]
pub struct FreezeReport {
pub segment_id: u32,
pub frozen_rows: usize,
pub bytes_freed: u64,
pub segment_bytes: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct FreezeSlice {
pub row_range: core::ops::Range<usize>,
pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
}
#[derive(Debug, Clone)]
pub struct CompactReport {
pub sources: Vec<u32>,
pub merged_segment_id: Option<u32>,
pub merged_segment_bytes: Vec<u8>,
pub merged_rows: usize,
pub deleted_rows_pruned: usize,
pub bytes_reclaimed_estimate: u64,
}
#[derive(Debug, Clone)]
pub enum IndexKind {
BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
Nsw(NswGraph),
Brin {
column_type: DataType,
},
Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
GinFulltext(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
}
impl IndexKind {
#[must_use]
pub fn approx_resident_bytes(&self) -> u64 {
const HEADER: usize = 24; let loc = core::mem::size_of::<RowLocator>();
match self {
IndexKind::BTree(map) => {
let key = core::mem::size_of::<IndexKey>();
map.iter()
.map(|(_, locs)| (key + HEADER + locs.len() * loc) as u64)
.sum()
}
IndexKind::Nsw(g) => {
let mut b = g.levels.len() as u64;
for layer in &g.layers {
for nbrs in layer.iter() {
b += (HEADER + nbrs.len() * core::mem::size_of::<u32>()) as u64;
}
}
b
}
IndexKind::Brin { .. } => core::mem::size_of::<DataType>() as u64,
IndexKind::Gin(map) | IndexKind::GinTrgm(map) | IndexKind::GinFulltext(map) => map
.iter()
.map(|(word, postings)| {
(word.len() + HEADER + HEADER + postings.len() * loc) as u64
})
.sum(),
}
}
}
#[derive(Debug, Clone)]
pub struct NswGraph {
pub m: usize,
pub m_max_0: usize,
pub entry: Option<usize>,
pub entry_level: u8,
pub levels: PersistentVec<u8>,
pub layers: Vec<PersistentVec<Vec<u32>>>,
}
impl NswGraph {
fn new(m: usize) -> Self {
Self {
m,
m_max_0: m.saturating_mul(2),
entry: None,
entry_level: 0,
levels: PersistentVec::new(),
layers: alloc::vec![PersistentVec::new()],
}
}
pub const fn cap_for_layer(&self, layer: u8) -> usize {
if layer == 0 { self.m_max_0 } else { self.m }
}
}
#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
x ^= x >> 30;
x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
x ^= x >> 27;
x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
x ^= x >> 31;
let mut level: u8 = 0;
while x & 0xF == 0 && level < MAX_LEVEL {
level += 1;
x >>= 4;
}
level
}
impl Index {
fn new_btree(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::BTree(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::Nsw(NswGraph::new(m)),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
Self {
name,
column_position,
kind: IndexKind::Brin { column_type },
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::Gin(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin_trgm(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
fn new_gin_fulltext(name: String, column_position: usize) -> Self {
Self {
name,
column_position,
kind: IndexKind::GinFulltext(PersistentBTreeMap::new()),
included_columns: Vec::new(),
partial_predicate: None,
expression: None,
is_unique: false,
extra_column_positions: Vec::new(),
}
}
pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
match &self.kind {
IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => &[][..],
}
}
pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
match &self.kind {
IndexKind::Gin(m) | IndexKind::GinFulltext(m) => {
m.get(&String::from(word)).map_or(&[][..], Vec::as_slice)
}
IndexKind::BTree(_)
| IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::GinTrgm(_) => &[][..],
}
}
pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
match &self.kind {
IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
IndexKind::BTree(_)
| IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinFulltext(_) => &[][..],
}
}
pub const fn nsw(&self) -> Option<&NswGraph> {
match &self.kind {
IndexKind::Nsw(g) => Some(g),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => None,
}
}
pub const fn is_brin(&self) -> bool {
matches!(self.kind, IndexKind::Brin { .. })
}
pub const fn is_gin_trgm(&self) -> bool {
matches!(self.kind, IndexKind::GinTrgm(_))
}
pub const fn is_gin(&self) -> bool {
matches!(self.kind, IndexKind::Gin(_))
}
pub const fn is_gin_fulltext(&self) -> bool {
matches!(self.kind, IndexKind::GinFulltext(_))
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum RowChange {
Insert { table: String, row: Row },
Update {
table: String,
pos: usize,
new_row: Vec<Value>,
},
Delete {
table: String,
positions: Vec<usize>,
},
}
#[must_use]
pub fn encode_redo_log(changes: &[RowChange]) -> Vec<u8> {
let mut out = Vec::new();
out.push(FILE_VERSION);
codec::write_u32(&mut out, changes.len() as u32);
let write_values = |out: &mut Vec<u8>, vals: &[Value]| {
codec::write_u32(out, vals.len() as u32);
for v in vals {
codec::write_value(out, v);
}
};
for change in changes {
match change {
RowChange::Insert { table, row } => {
out.push(0);
codec::write_str(&mut out, table);
write_values(&mut out, &row.values);
}
RowChange::Update {
table,
pos,
new_row,
} => {
out.push(1);
codec::write_str(&mut out, table);
codec::write_u32(&mut out, *pos as u32);
write_values(&mut out, new_row);
}
RowChange::Delete { table, positions } => {
out.push(2);
codec::write_str(&mut out, table);
codec::write_u32(&mut out, positions.len() as u32);
for p in positions {
codec::write_u32(&mut out, *p as u32);
}
}
}
}
out
}
pub fn decode_redo_log(bytes: &[u8]) -> Result<Vec<RowChange>, StorageError> {
let version = *bytes
.first()
.ok_or_else(|| StorageError::Corrupt("redo log: empty".into()))?;
let mut cur = codec::Cursor::new(bytes).with_codec_version(version);
let _version = cur.read_u8()?;
let count = cur.read_u32()? as usize;
let mut read_values = |cur: &mut codec::Cursor<'_>| -> Result<Vec<Value>, StorageError> {
let n = cur.read_u32()? as usize;
let mut vals = Vec::with_capacity(n);
for _ in 0..n {
vals.push(cur.read_value()?);
}
Ok(vals)
};
let mut changes = Vec::with_capacity(count);
for _ in 0..count {
let op = cur.read_u8()?;
let table = cur.read_str()?;
let change = match op {
0 => RowChange::Insert {
table,
row: Row::new(read_values(&mut cur)?),
},
1 => {
let pos = cur.read_u32()? as usize;
RowChange::Update {
table,
pos,
new_row: read_values(&mut cur)?,
}
}
2 => {
let n = cur.read_u32()? as usize;
let mut positions = Vec::with_capacity(n);
for _ in 0..n {
positions.push(cur.read_u32()? as usize);
}
RowChange::Delete { table, positions }
}
other => {
return Err(StorageError::Corrupt(alloc::format!(
"redo log: unknown op {other}"
)));
}
};
changes.push(change);
}
Ok(changes)
}
#[derive(Debug, Clone)]
pub struct Table {
schema: TableSchema,
rows: PersistentVec<Row>,
indices: Vec<Index>,
hot_bytes: u64,
cold_row_count: u64,
cold_row_count_stale: bool,
redo_log: Option<Vec<RowChange>>,
}
#[derive(Debug, Clone, Default)]
pub struct Catalog {
tables: Vec<Table>,
by_name: BTreeMap<String, usize>,
cold_segments: Vec<Option<Arc<OwnedSegment>>>,
functions: BTreeMap<String, FunctionDef>,
triggers: Vec<TriggerDef>,
sequences: BTreeMap<String, SequenceDef>,
views: BTreeMap<String, ViewDef>,
materialized_views: BTreeMap<String, String>,
enum_types: BTreeMap<String, EnumDef>,
domain_types: BTreeMap<String, DomainDef>,
schemas: alloc::collections::BTreeSet<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FunctionDef {
pub name: String,
pub args_repr: String,
pub returns: String,
pub language: String,
pub body: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TriggerDef {
pub name: String,
pub table: String,
pub timing: String,
pub events: Vec<String>,
pub for_each: String,
pub function: String,
pub update_columns: Vec<String>,
pub enabled: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SequenceDef {
pub name: String,
pub data_type: SequenceDataType,
pub start: i64,
pub increment: i64,
pub min_value: i64,
pub max_value: i64,
pub cache: i64,
pub cycle: bool,
pub owned_by: Option<(String, String)>,
pub last_value: i64,
pub is_called: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SequenceDataType {
SmallInt,
Int,
BigInt,
}
#[must_use]
pub fn is_builtin_schema(name: &str) -> bool {
name.eq_ignore_ascii_case("public")
|| name.eq_ignore_ascii_case("pg_catalog")
|| name.eq_ignore_ascii_case("information_schema")
}
#[must_use]
pub fn parse_uuid_str(input: &str) -> Option<[u8; 16]> {
let s = input.trim();
let s = if let Some(inner) = s.strip_prefix('{').and_then(|x| x.strip_suffix('}')) {
inner
} else {
s
};
let hex: String = match s.len() {
32 => s.to_ascii_lowercase(),
36 => {
let b = s.as_bytes();
if b[8] != b'-' || b[13] != b'-' || b[18] != b'-' || b[23] != b'-' {
return None;
}
let mut out = String::with_capacity(32);
out.push_str(&s[0..8]);
out.push_str(&s[9..13]);
out.push_str(&s[14..18]);
out.push_str(&s[19..23]);
out.push_str(&s[24..36]);
out.make_ascii_lowercase();
out
}
_ => return None,
};
let bytes = hex.as_bytes();
let mut out = [0u8; 16];
for i in 0..16 {
let hi = hex_nibble(bytes[i * 2])?;
let lo = hex_nibble(bytes[i * 2 + 1])?;
out[i] = (hi << 4) | lo;
}
Some(out)
}
fn hex_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(10 + b - b'a'),
b'A'..=b'F' => Some(10 + b - b'A'),
_ => None,
}
}
#[must_use]
pub fn format_uuid(b: &[u8; 16]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(36);
for (i, byte) in b.iter().enumerate() {
if matches!(i, 4 | 6 | 8 | 10) {
out.push('-');
}
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DomainDef {
pub name: String,
pub base_type: DataType,
pub nullable: bool,
pub default: Option<String>,
pub checks: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EnumDef {
pub name: String,
pub labels: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ViewDef {
pub name: String,
pub columns: Vec<String>,
pub body: String,
}
impl SequenceDataType {
pub fn default_bounds(self, increment_positive: bool) -> (i64, i64) {
match self {
Self::SmallInt => {
if increment_positive {
(1, i64::from(i16::MAX))
} else {
(i64::from(i16::MIN), -1)
}
}
Self::Int => {
if increment_positive {
(1, i64::from(i32::MAX))
} else {
(i64::from(i32::MIN), -1)
}
}
Self::BigInt => {
if increment_positive {
(1, i64::MAX)
} else {
(i64::MIN, -1)
}
}
}
}
}
impl Catalog {
pub const fn new() -> Self {
Self {
tables: Vec::new(),
by_name: BTreeMap::new(),
cold_segments: Vec::new(),
functions: BTreeMap::new(),
triggers: Vec::new(),
sequences: BTreeMap::new(),
views: BTreeMap::new(),
materialized_views: BTreeMap::new(),
enum_types: BTreeMap::new(),
domain_types: BTreeMap::new(),
schemas: alloc::collections::BTreeSet::new(),
}
}
pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
&self.functions
}
pub fn create_function(
&mut self,
def: FunctionDef,
or_replace: bool,
) -> Result<(), StorageError> {
if !or_replace && self.functions.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"function {:?} already exists (drop or use CREATE OR REPLACE)",
def.name
)));
}
self.functions.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_function(&mut self, name: &str) -> bool {
self.functions.remove(name).is_some()
}
pub const fn sequences(&self) -> &BTreeMap<String, SequenceDef> {
&self.sequences
}
pub fn create_sequence(
&mut self,
def: SequenceDef,
if_not_exists: bool,
) -> Result<(), StorageError> {
if self.sequences.contains_key(&def.name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"sequence {:?} already exists",
def.name
)));
}
self.sequences.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_sequence(&mut self, name: &str) -> bool {
self.sequences.remove(name).is_some()
}
pub fn sequence_next_value(&mut self, name: &str) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
let candidate = if seq.is_called {
let next = seq.last_value.checked_add(seq.increment).ok_or_else(|| {
StorageError::Corrupt(format!("sequence {name:?} arithmetic overflow"))
})?;
if seq.increment > 0 {
if next > seq.max_value {
if seq.cycle {
seq.min_value
} else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} reached MAXVALUE ({})",
seq.max_value
)));
}
} else {
next
}
} else if next < seq.min_value {
if seq.cycle {
seq.max_value
} else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} reached MINVALUE ({})",
seq.min_value
)));
}
} else {
next
}
} else {
seq.last_value
};
seq.last_value = candidate;
seq.is_called = true;
Ok(candidate)
}
pub fn sequence_current_value(&self, name: &str) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
if !seq.is_called {
return Err(StorageError::Corrupt(format!(
"currval of sequence {name:?} is not yet defined in this session"
)));
}
Ok(seq.last_value)
}
pub fn sequence_set_value(
&mut self,
name: &str,
value: i64,
is_called: bool,
) -> Result<i64, StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
seq.last_value = value;
seq.is_called = is_called;
Ok(value)
}
pub const fn views(&self) -> &BTreeMap<String, ViewDef> {
&self.views
}
pub fn create_view(
&mut self,
def: ViewDef,
or_replace: bool,
if_not_exists: bool,
) -> Result<(), StorageError> {
if self.views.contains_key(&def.name) {
if or_replace {
self.views.insert(def.name.clone(), def);
return Ok(());
}
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"view {:?} already exists",
def.name
)));
}
if self.by_name.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"view {:?} would shadow an existing table",
def.name
)));
}
if self.sequences.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"view {:?} would shadow an existing sequence",
def.name
)));
}
self.views.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_view(&mut self, name: &str) -> bool {
self.views.remove(name).is_some()
}
pub const fn materialized_views(&self) -> &BTreeMap<String, String> {
&self.materialized_views
}
pub fn register_materialized_view(&mut self, name: String, body: String) {
self.materialized_views.insert(name, body);
}
pub fn drop_materialized_view_source(&mut self, name: &str) -> bool {
self.materialized_views.remove(name).is_some()
}
pub const fn enum_types(&self) -> &BTreeMap<String, EnumDef> {
&self.enum_types
}
pub fn create_enum_type(&mut self, def: EnumDef) -> Result<(), StorageError> {
if self.enum_types.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"type {:?} already exists",
def.name
)));
}
self.enum_types.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_enum_type(&mut self, name: &str) -> bool {
self.enum_types.remove(name).is_some()
}
pub const fn domain_types(&self) -> &BTreeMap<String, DomainDef> {
&self.domain_types
}
pub fn create_domain_type(&mut self, def: DomainDef) -> Result<(), StorageError> {
if self.domain_types.contains_key(&def.name) {
return Err(StorageError::Corrupt(format!(
"domain {:?} already exists",
def.name
)));
}
self.domain_types.insert(def.name.clone(), def);
Ok(())
}
pub fn drop_domain_type(&mut self, name: &str) -> bool {
self.domain_types.remove(name).is_some()
}
pub const fn user_schemas(&self) -> &alloc::collections::BTreeSet<String> {
&self.schemas
}
pub fn schema_exists(&self, name: &str) -> bool {
is_builtin_schema(name) || self.schemas.contains(name)
}
pub fn create_schema(&mut self, name: String, if_not_exists: bool) -> Result<(), StorageError> {
if is_builtin_schema(&name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"schema {name:?} is built-in and cannot be redeclared"
)));
}
if self.schemas.contains(&name) {
if if_not_exists {
return Ok(());
}
return Err(StorageError::Corrupt(format!(
"schema {name:?} already exists"
)));
}
self.schemas.insert(name);
Ok(())
}
pub fn drop_schema(&mut self, name: &str) -> Result<bool, StorageError> {
if is_builtin_schema(name) {
return Err(StorageError::Corrupt(format!(
"schema {name:?} is built-in and cannot be dropped"
)));
}
Ok(self.schemas.remove(name))
}
#[allow(clippy::too_many_arguments)]
pub fn alter_sequence(
&mut self,
name: &str,
increment: Option<i64>,
min_value: Option<i64>,
max_value: Option<i64>,
start: Option<i64>,
restart: Option<Option<i64>>,
cache: Option<i64>,
cycle: Option<bool>,
owned_by: Option<Option<(String, String)>>,
) -> Result<(), StorageError> {
let Some(seq) = self.sequences.get_mut(name) else {
return Err(StorageError::Corrupt(format!(
"sequence {name:?} does not exist"
)));
};
if let Some(v) = increment {
seq.increment = v;
}
if let Some(v) = min_value {
seq.min_value = v;
}
if let Some(v) = max_value {
seq.max_value = v;
}
if let Some(v) = start {
seq.start = v;
}
if let Some(restart_value) = restart {
seq.last_value = restart_value.unwrap_or(seq.start);
seq.is_called = false;
}
if let Some(v) = cache {
seq.cache = v;
}
if let Some(v) = cycle {
seq.cycle = v;
}
if let Some(v) = owned_by {
seq.owned_by = v;
}
Ok(())
}
pub fn triggers(&self) -> &[TriggerDef] {
&self.triggers
}
pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
&mut self.triggers
}
pub fn create_trigger(
&mut self,
def: TriggerDef,
or_replace: bool,
) -> Result<(), StorageError> {
if !self.by_name.contains_key(&def.table) {
return Err(StorageError::TableNotFound {
name: def.table.clone(),
});
}
if !self.functions.contains_key(&def.function) {
return Err(StorageError::Corrupt(format!(
"trigger {:?} references unknown function {:?}",
def.name, def.function
)));
}
let dup = self
.triggers
.iter()
.position(|t| t.name == def.name && t.table == def.table);
match (dup, or_replace) {
(Some(_), false) => Err(StorageError::Corrupt(format!(
"trigger {:?} already exists on table {:?}",
def.name, def.table
))),
(Some(i), true) => {
self.triggers[i] = def;
Ok(())
}
(None, _) => {
self.triggers.push(def);
Ok(())
}
}
}
pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
let before = self.triggers.len();
self.triggers
.retain(|t| !(t.name == name && t.table == table));
before != self.triggers.len()
}
pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
if self.by_name.contains_key(&schema.name) {
return Err(StorageError::DuplicateTable {
name: schema.name.clone(),
});
}
let idx = self.tables.len();
let name = schema.name.clone();
self.tables.push(Table::new(schema));
self.by_name.insert(name, idx);
Ok(())
}
pub fn get(&self, name: &str) -> Option<&Table> {
let idx = *self.by_name.get(name)?;
self.tables.get(idx)
}
pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
let idx = *self.by_name.get(name)?;
self.tables.get_mut(idx)
}
pub fn apply_redo(&mut self, changes: &[RowChange]) -> Result<(), StorageError> {
for change in changes {
match change {
RowChange::Insert { table, row } => {
self.table_for_redo(table)?.insert(row.clone())?;
}
RowChange::Update {
table,
pos,
new_row,
} => {
self.table_for_redo(table)?
.update_row(*pos, new_row.clone())?;
}
RowChange::Delete { table, positions } => {
self.table_for_redo(table)?.delete_rows(positions);
}
}
}
Ok(())
}
fn table_for_redo(&mut self, name: &str) -> Result<&mut Table, StorageError> {
self.get_mut(name)
.ok_or_else(|| StorageError::Corrupt(alloc::format!("redo: unknown table {name:?}")))
}
pub fn enable_redo_all(&mut self) {
for t in &mut self.tables {
t.enable_redo();
}
}
pub fn drain_redo(&mut self) -> Vec<RowChange> {
let mut all = Vec::new();
for t in &mut self.tables {
all.extend(t.take_redo());
}
all
}
pub fn table_count(&self) -> usize {
self.tables.len()
}
pub fn drop_table(&mut self, name: &str) -> bool {
let Some(idx) = self.by_name.remove(name) else {
return false;
};
self.tables.swap_remove(idx);
if idx < self.tables.len() {
let moved_name = self.tables[idx].schema.name.clone();
self.by_name.insert(moved_name, idx);
}
true
}
pub fn rename_table(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
if old == new {
return Ok(());
}
if self.by_name.contains_key(new) {
return Err(StorageError::Corrupt(format!(
"rename_table: target name {new:?} already exists"
)));
}
let idx = self
.by_name
.remove(old)
.ok_or_else(|| StorageError::TableNotFound { name: old.into() })?;
self.tables[idx].schema.name = new.to_string();
self.by_name.insert(new.to_string(), idx);
for t in &mut self.tables {
for fk in &mut t.schema.foreign_keys {
if fk.parent_table == old {
fk.parent_table = new.to_string();
}
}
}
for trig in &mut self.triggers {
if trig.table == old {
trig.table = new.to_string();
}
}
Ok(())
}
pub fn rename_index(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
if old == new {
return Ok(());
}
for t in &self.tables {
if t.indices.iter().any(|i| i.name == new) {
return Err(StorageError::Corrupt(format!(
"rename_index: target name {new:?} already exists"
)));
}
}
for t in &mut self.tables {
for i in &mut t.indices {
if i.name == old {
i.name = new.to_string();
return Ok(());
}
}
}
Err(StorageError::IndexNotFound { name: old.into() })
}
pub fn drop_named_index(&mut self, name: &str) -> bool {
for t in &mut self.tables {
let before = t.indices.len();
t.indices.retain(|i| i.name != name);
if t.indices.len() != before {
return true;
}
}
false
}
pub fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.schema.name.clone()).collect()
}
pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
})?;
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
self.cold_segments.push(Some(Arc::new(seg)));
Ok(id)
}
pub fn load_segment_bytes_at(
&mut self,
target_id: u32,
bytes: Vec<u8>,
) -> Result<(), StorageError> {
let seg = OwnedSegment::from_bytes(bytes)
.map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
let idx = target_id as usize;
while self.cold_segments.len() <= idx {
self.cold_segments.push(None);
}
if self.cold_segments[idx].is_some() {
return Err(StorageError::Corrupt(format!(
"load_segment_bytes_at: segment_id {target_id} already occupied"
)));
}
self.cold_segments[idx] = Some(Arc::new(seg));
Ok(())
}
pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
let idx = segment_id as usize;
if idx >= self.cold_segments.len() {
return Err(StorageError::Corrupt(format!(
"tombstone_segment: segment_id {segment_id} out of bounds (len={})",
self.cold_segments.len()
)));
}
self.cold_segments[idx] = None;
Ok(())
}
#[must_use]
pub fn cold_segment_count(&self) -> usize {
self.cold_segments.iter().filter(|s| s.is_some()).count()
}
#[must_use]
pub fn cold_segment_slot_count(&self) -> usize {
self.cold_segments.len()
}
#[must_use]
pub fn cold_segment_ids_global(&self) -> Vec<u32> {
self.cold_segments
.iter()
.enumerate()
.filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
.collect()
}
#[must_use]
pub fn hot_tier_bytes(&self) -> u64 {
self.tables
.iter()
.map(Table::hot_bytes)
.fold(0u64, u64::saturating_add)
}
pub fn freeze_oldest_to_cold(
&mut self,
table_name: &str,
index_name: &str,
max_rows: usize,
) -> Result<FreezeReport, StorageError> {
if max_rows == 0 {
return Err(StorageError::Corrupt(
"freeze_oldest_to_cold: max_rows must be > 0".into(),
));
}
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: table {table_name:?} not found"
))
})?;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
table.rows.len()
)));
}
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
for row_idx in 0..max_rows {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
to_freeze.sort_by_key(|(k, _, _)| *k);
for w in to_freeze.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"freeze_oldest_to_cold: duplicate PK {} in freeze batch",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
.into_iter()
.map(|(k, body, _)| (k, body))
.collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
#[must_use]
pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
self.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
}
pub fn resolve_cold_locator(
&self,
table_name: &str,
segment_id: u32,
key: &IndexKey,
) -> Option<Row> {
let t = self.get(table_name)?;
let u64_key = index_key_as_u64(key)?;
let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
let payload = seg.lookup(u64_key)?;
let (row, _) = decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
Some(row)
}
pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
let t = self.get(table)?;
let idx = t.indices.iter().find(|i| i.name == index_name)?;
let locators = idx.lookup_eq(key);
let cold_u64_key = index_key_as_u64(key);
for loc in locators {
match *loc {
RowLocator::Hot(i) => {
if let Some(row) = t.rows.get(i) {
return Some(row.clone());
}
}
RowLocator::Cold {
segment_id,
page_offset: _,
} => {
let Some(u64_key) = cold_u64_key else {
continue;
};
let Some(seg) = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_deref())
else {
continue;
};
let Some(payload) = seg.lookup(u64_key) else {
continue;
};
let (row, _) =
decode_row_body_dense(&payload, &t.schema, seg.codec_version()).ok()?;
return Some(row);
}
}
}
None
}
pub fn promote_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<usize>, StorageError> {
let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
let Some((segment_id, _page_offset)) = cold_loc else {
return Ok(None);
};
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(
"promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
.into(),
)
})?;
let schema = self
.get(table_name)
.ok_or_else(|| {
StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
})?
.schema
.clone();
let seg = self
.cold_segments
.get(segment_id as usize)
.and_then(|s| s.as_ref())
.ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: segment {segment_id} not registered on catalog"
))
})?;
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"promote_cold_row: key {u64_key} resolves to segment {segment_id} \
but the segment's bloom/page lookup didn't return a row"
))
})?;
let (row, _consumed) = decode_row_body_dense(&payload, &schema, seg.codec_version())?;
let t = self
.get_mut(table_name)
.expect("table existed at lookup time");
t.insert(row)?;
let new_hot_idx =
t.rows.len().checked_sub(1).ok_or_else(|| {
StorageError::Corrupt("promote_cold_row: empty after insert".into())
})?;
t.remove_cold_locators_for_key(index_name, key)?;
Ok(Some(new_hot_idx))
}
pub fn shadow_cold_row(
&mut self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<usize, StorageError> {
let t = self.get_mut(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
})?;
t.remove_cold_locators_for_key(index_name, key)
}
pub fn prepare_freeze_slice(
&self,
table_name: &str,
index_name: &str,
row_range: core::ops::Range<usize>,
) -> Result<FreezeSlice, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
if row_range.end > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"prepare_freeze_slice: row_range end {} > row_count {}",
row_range.end,
table.rows.len()
)));
}
let column_position = idx.column_position;
let schema = table.schema.clone();
let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
for row_idx in row_range.clone() {
let row = table.rows.get(row_idx).expect("bounds-checked above");
let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
))
})?;
let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
StorageError::Corrupt(format!(
"prepare_freeze_slice: index {index_name:?} column type is non-integer; \
v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
}
rows.sort_by_key(|(k, _, _)| *k);
Ok(FreezeSlice { row_range, rows })
}
pub fn commit_freeze_slices(
&mut self,
table_name: &str,
index_name: &str,
slices: Vec<FreezeSlice>,
) -> Result<FreezeReport, StorageError> {
let table = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: table {table_name:?} not found"
))
})?;
let idx = table
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
)));
}
let mut ordered = slices;
ordered.sort_by_key(|s| s.row_range.start);
let mut expected_start = 0usize;
for s in &ordered {
if s.row_range.start != expected_start {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: gap/overlap at row {}; expected start {}",
s.row_range.start, expected_start
)));
}
expected_start = s.row_range.end;
}
let max_rows = expected_start;
if max_rows > table.rows.len() {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total row range {} exceeds row_count {}",
max_rows,
table.rows.len()
)));
}
if max_rows == 0 {
return Ok(FreezeReport {
segment_id: u32::MAX,
frozen_rows: 0,
bytes_freed: 0,
segment_bytes: Vec::new(),
});
}
let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
if total_rows != max_rows {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
)));
}
let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
loop {
let mut pick: Option<usize> = None;
for (i, c) in cursors.iter().enumerate() {
let slice = &ordered[i];
if *c >= slice.rows.len() {
continue;
}
match pick {
None => pick = Some(i),
Some(j) => {
if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
pick = Some(i);
}
}
}
}
let Some(i) = pick else { break };
let row = ordered[i].rows[cursors[i]].clone();
cursors[i] += 1;
merged.push(row);
}
for w in merged.windows(2) {
if w[0].0 == w[1].0 {
return Err(StorageError::Corrupt(format!(
"commit_freeze_slices: duplicate PK {} across slices",
w[0].0
)));
}
}
let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
let seg_rows: Vec<(u64, Vec<u8>)> =
merged.into_iter().map(|(k, body, _)| (k, body)).collect();
let frozen_rows = seg_rows.len();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
let positions: Vec<usize> = (0..max_rows).collect();
let t_mut = self
.get_mut(table_name)
.expect("just validated; still present");
let removed = t_mut.delete_rows(&positions);
debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
let bytes_after = t_mut.hot_bytes();
let bytes_freed = bytes_before.saturating_sub(bytes_after);
let segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
let new_cold = post_swap_keys.into_iter().map(|k| {
(
k,
RowLocator::Cold {
segment_id,
page_offset: 0,
},
)
});
let t_mut = self.get_mut(table_name).expect("still present");
t_mut.register_cold_locators(index_name, new_cold)?;
Ok(FreezeReport {
segment_id,
frozen_rows,
bytes_freed,
segment_bytes: seg_bytes,
})
}
pub fn compact_cold_segments(
&mut self,
table_name: &str,
index_name: &str,
target_segment_bytes: u64,
) -> Result<CompactReport, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: table {table_name:?} not found"
))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} not found on {table_name:?}"
))
})?;
let map = match &idx.kind {
IndexKind::BTree(m) => m,
IndexKind::Nsw(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
return Err(StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} is not BTree; \
compaction applies only to BTree cold-tier indices"
)));
}
};
let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
for (_key, locators) in map.iter() {
for loc in locators {
if let RowLocator::Cold { segment_id, .. } = loc {
referenced_ids.insert(*segment_id);
}
}
}
let candidate_set: BTreeSet<u32> = referenced_ids
.into_iter()
.filter(|id| {
self.cold_segments
.get(*id as usize)
.and_then(|s| s.as_deref())
.is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
})
.collect();
if candidate_set.len() < 2 {
return Ok(CompactReport {
sources: Vec::new(),
merged_segment_id: None,
merged_segment_bytes: Vec::new(),
merged_rows: 0,
deleted_rows_pruned: 0,
bytes_reclaimed_estimate: 0,
});
}
let mut source_row_count: usize = 0;
let mut source_byte_total: u64 = 0;
for &id in &candidate_set {
let seg = self.cold_segments[id as usize]
.as_ref()
.expect("candidate selected only when slot is Some");
source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
}
let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
for (key, locators) in map.iter() {
for loc in locators {
let RowLocator::Cold { segment_id, .. } = loc else {
continue;
};
if !candidate_set.contains(segment_id) {
continue;
}
let u64_key = index_key_as_u64(key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: index {index_name:?} has non-integer Cold key; \
cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
))
})?;
let seg = self.cold_segments[*segment_id as usize]
.as_ref()
.expect("candidate slot guaranteed Some above");
let payload = seg.lookup(u64_key).ok_or_else(|| {
StorageError::Corrupt(format!(
"compact_cold_segments: BTree {index_name:?} points key={u64_key} \
at segment {segment_id} but the segment lookup missed"
))
})?;
collected.insert(u64_key, (payload, key.clone()));
break;
}
}
let merged_rows = collected.len();
let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
let seg_rows: Vec<(u64, Vec<u8>)> = collected
.iter()
.map(|(k, (body, _))| (*k, body.clone()))
.collect();
let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
.map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
let merged_bytes_len = seg_bytes.len() as u64;
let merged_segment_id = self
.load_segment_bytes(seg_bytes.clone())
.map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
let t = self
.get(table_name)
.expect("table existed at the start of this fn");
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map) = &idx.kind else {
unreachable!("validated above");
};
map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let t_mut = self
.get_mut(table_name)
.expect("table existed at the start of this fn");
let idx_mut = t_mut
.indices
.iter_mut()
.find(|i| i.name == index_name)
.expect("index existed at the start of this fn");
let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
unreachable!("validated above");
};
for (key, locators) in entries {
let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
let mut changed = false;
for loc in &locators {
match *loc {
RowLocator::Cold {
segment_id,
page_offset: _,
} if candidate_set.contains(&segment_id) => {
let replacement = RowLocator::Cold {
segment_id: merged_segment_id,
page_offset: 0,
};
if !new_locs.contains(&replacement) {
new_locs.push(replacement);
}
changed = true;
}
other => new_locs.push(other),
}
}
if changed {
map_mut.insert_mut(key, new_locs);
}
}
for &id in &candidate_set {
self.tombstone_segment(id)?;
}
let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
Ok(CompactReport {
sources: candidate_set.into_iter().collect(),
merged_segment_id: Some(merged_segment_id),
merged_segment_bytes: seg_bytes,
merged_rows,
deleted_rows_pruned,
bytes_reclaimed_estimate,
})
}
fn find_cold_locator(
&self,
table_name: &str,
index_name: &str,
key: &IndexKey,
) -> Result<Option<(u32, u32)>, StorageError> {
let t = self.get(table_name).ok_or_else(|| {
StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
})?;
let idx = t
.indices
.iter()
.find(|i| i.name == index_name)
.ok_or_else(|| {
StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} not found on {table_name:?}"
))
})?;
if !matches!(idx.kind, IndexKind::BTree(_)) {
return Err(StorageError::Corrupt(format!(
"find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
)));
}
for loc in idx.lookup_eq(key) {
if let RowLocator::Cold {
segment_id,
page_offset,
} = *loc
{
return Ok(Some((segment_id, page_offset)));
}
}
Ok(None)
}
}
fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
match key {
IndexKey::Int(n) => Some(n.cast_unsigned()),
IndexKey::Text(_) | IndexKey::Bool(_) | IndexKey::Uuid(_) => None,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum StorageError {
DuplicateTable {
name: String,
},
TableNotFound {
name: String,
},
ArityMismatch {
expected: usize,
actual: usize,
},
TypeMismatch {
column: String,
expected: DataType,
actual: DataType,
position: usize,
},
NullInNotNull {
column: String,
},
DuplicateIndex {
name: String,
},
ColumnNotFound {
column: String,
},
Corrupt(String),
IndexNotFound {
name: String,
},
Unsupported(String),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
Self::TableNotFound { name } => write!(f, "table not found: {name}"),
Self::ArityMismatch { expected, actual } => write!(
f,
"row arity mismatch: expected {expected} columns, got {actual}"
),
Self::TypeMismatch {
column,
expected,
actual,
position,
} => write!(
f,
"type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
),
Self::NullInNotNull { column } => {
write!(f, "NULL value in NOT NULL column {column:?}")
}
Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
}
}
}
impl ColumnSchema {
pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
Self {
name: name.into(),
ty,
nullable,
default: None,
runtime_default: None,
auto_increment: false,
user_enum_type: None,
user_domain_type: None,
on_update_runtime: None,
collation: Collation::Binary,
is_unsigned: false,
inline_enum_variants: None,
inline_set_variants: None,
}
}
#[must_use]
pub fn with_default(mut self, default: Value) -> Self {
self.default = Some(default);
self
}
#[must_use]
pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
self.runtime_default = Some(expr.into());
self
}
#[must_use]
pub const fn with_auto_increment(mut self) -> Self {
self.auto_increment = true;
self
}
}
impl TableSchema {
pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
Self {
name: name.into(),
columns,
hot_tier_bytes: None,
foreign_keys: Vec::new(),
uniqueness_constraints: Vec::new(),
checks: Vec::new(),
}
}
}
const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
const FILE_VERSION: u8 = 47;
const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
const INDEX_KEY_TAG_INT: u8 = 0;
const INDEX_KEY_TAG_TEXT: u8 = 1;
const INDEX_KEY_TAG_BOOL: u8 = 2;
const INDEX_KEY_TAG_UUID: u8 = 3;
impl Catalog {
pub fn serialize(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(FILE_VERSION);
write_u32(
&mut out,
u32::try_from(self.tables.len()).expect("≤ 4G tables"),
);
for t in &self.tables {
write_str(&mut out, &t.schema.name);
write_u16(
&mut out,
u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
);
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(
&mut out,
u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
);
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(
&mut out,
u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
);
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(
&mut out,
u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
);
match &idx.kind {
IndexKind::BTree(map) => {
out.push(0);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G index entries/index"),
);
for (key, locators) in map {
write_index_key(&mut out, key);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/key"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { column_type } => {
out.push(2);
write_data_type(&mut out, *column_type);
}
IndexKind::Gin(map) => {
out.push(3);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
);
for (word, locators) in map {
write_str(&mut out, word);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::GinTrgm(map) => {
out.push(4);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G trigram-GIN posting lists"),
);
for (tri, locators) in map {
write_str(&mut out, tri);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
IndexKind::GinFulltext(map) => {
out.push(5);
write_u32(
&mut out,
u32::try_from(map.len()).expect("≤ 4G fulltext-GIN posting lists"),
);
for (lex, locators) in map {
write_str(&mut out, lex);
write_u32(
&mut out,
u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
);
for loc in locators {
loc.write_le(&mut out);
}
}
}
}
write_u16(
&mut out,
u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
);
for col_pos in &idx.included_columns {
write_u16(
&mut out,
u16::try_from(*col_pos).expect("≤ 65k columns/table"),
);
}
match &idx.partial_predicate {
None => out.push(0),
Some(pred) => {
out.push(1);
write_str(&mut out, pred);
}
}
match &idx.expression {
None => out.push(0),
Some(expr) => {
out.push(1);
write_str(&mut out, expr);
}
}
out.push(u8::from(idx.is_unique));
write_u16(
&mut out,
u16::try_from(idx.extra_column_positions.len())
.expect("≤ 65k extra cols / index"),
);
for cp in &idx.extra_column_positions {
write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
}
}
match t.schema.hot_tier_bytes {
None => out.push(0),
Some(n) => {
out.push(1);
out.extend_from_slice(&n.to_le_bytes());
}
}
write_u16(
&mut out,
u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
);
for fk in &t.schema.foreign_keys {
match &fk.name {
None => out.push(0),
Some(n) => {
out.push(1);
write_str(&mut out, n);
}
}
write_u16(
&mut out,
u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
);
for &p in &fk.local_columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
write_str(&mut out, &fk.parent_table);
write_u16(
&mut out,
u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
);
for &p in &fk.parent_columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
out.push(fk.on_delete.tag());
out.push(fk.on_update.tag());
}
write_u16(
&mut out,
u16::try_from(t.schema.uniqueness_constraints.len())
.expect("≤ 65k uniqueness constraints/table"),
);
for uc in &t.schema.uniqueness_constraints {
out.push(u8::from(uc.is_primary_key));
write_u16(
&mut out,
u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
);
for &p in &uc.columns {
write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
}
out.push(u8::from(uc.nulls_not_distinct));
}
let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.runtime_default {
rt_defaults.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
);
for (pos, expr) in rt_defaults {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, expr);
}
write_u16(
&mut out,
u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
);
for c in &t.schema.checks {
write_str(&mut out, c.as_str());
}
let mut enum_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.user_enum_type {
enum_bindings.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(enum_bindings.len()).expect("≤ 65k enum-typed columns/table"),
);
for (pos, ename) in enum_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, ename);
}
let mut domain_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(d) = &c.user_domain_type {
domain_bindings.push((i, d.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(domain_bindings.len()).expect("≤ 65k domain-typed columns/table"),
);
for (pos, dname) in domain_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, dname);
}
let mut on_update_bindings: Vec<(usize, &str)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(e) = &c.on_update_runtime {
on_update_bindings.push((i, e.as_str()));
}
}
write_u16(
&mut out,
u16::try_from(on_update_bindings.len()).expect("≤ 65k ON UPDATE columns/table"),
);
for (pos, expr_src) in on_update_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_str(&mut out, expr_src);
}
let mut coll_bindings: Vec<(usize, u8)> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
let tag = match c.collation {
Collation::Binary => continue,
Collation::CaseInsensitive => Collation::TAG_CASE_INSENSITIVE,
};
coll_bindings.push((i, tag));
}
write_u16(
&mut out,
u16::try_from(coll_bindings.len()).expect("≤ 65k collation bindings/table"),
);
for (pos, tag) in coll_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
out.push(tag);
}
let mut unsigned_bindings: Vec<usize> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if c.is_unsigned {
unsigned_bindings.push(i);
}
}
write_u16(
&mut out,
u16::try_from(unsigned_bindings.len()).expect("≤ 65k UNSIGNED columns/table"),
);
for pos in unsigned_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
}
let mut enum_inline_bindings: Vec<(usize, &[String])> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(vs) = &c.inline_enum_variants {
enum_inline_bindings.push((i, vs.as_slice()));
}
}
write_u16(
&mut out,
u16::try_from(enum_inline_bindings.len()).expect("≤ 65k inline-ENUM columns/table"),
);
for (pos, variants) in enum_inline_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_u16(
&mut out,
u16::try_from(variants.len()).expect("≤ 65k variants/ENUM"),
);
for v in variants {
write_str(&mut out, v.as_str());
}
}
let mut set_inline_bindings: Vec<(usize, &[String])> = Vec::new();
for (i, c) in t.schema.columns.iter().enumerate() {
if let Some(vs) = &c.inline_set_variants {
set_inline_bindings.push((i, vs.as_slice()));
}
}
write_u16(
&mut out,
u16::try_from(set_inline_bindings.len()).expect("≤ 65k inline-SET columns/table"),
);
for (pos, variants) in set_inline_bindings {
write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
write_u16(
&mut out,
u16::try_from(variants.len()).expect("≤ 65k variants/SET"),
);
for v in variants {
write_str(&mut out, v.as_str());
}
}
}
write_u32(
&mut out,
u32::try_from(self.functions.len()).expect("≤ 4G functions"),
);
for fd in self.functions.values() {
write_str(&mut out, &fd.name);
write_str(&mut out, &fd.args_repr);
write_str(&mut out, &fd.returns);
write_str(&mut out, &fd.language);
write_str_long(&mut out, &fd.body);
}
write_u32(
&mut out,
u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
);
for td in &self.triggers {
write_str(&mut out, &td.name);
write_str(&mut out, &td.table);
write_str(&mut out, &td.timing);
write_u16(
&mut out,
u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
);
for ev in &td.events {
write_str(&mut out, ev);
}
write_str(&mut out, &td.for_each);
write_str(&mut out, &td.function);
write_u16(
&mut out,
u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
);
for c in &td.update_columns {
write_str(&mut out, c);
}
out.push(u8::from(td.enabled));
}
write_u32(
&mut out,
u32::try_from(self.sequences.len()).expect("≤ 4G sequences"),
);
for seq in self.sequences.values() {
write_str(&mut out, &seq.name);
out.push(match seq.data_type {
SequenceDataType::SmallInt => 0,
SequenceDataType::Int => 1,
SequenceDataType::BigInt => 2,
});
out.extend_from_slice(&seq.start.to_le_bytes());
out.extend_from_slice(&seq.increment.to_le_bytes());
out.extend_from_slice(&seq.min_value.to_le_bytes());
out.extend_from_slice(&seq.max_value.to_le_bytes());
out.extend_from_slice(&seq.cache.to_le_bytes());
out.push(u8::from(seq.cycle));
match &seq.owned_by {
None => out.push(0),
Some((table, column)) => {
out.push(1);
write_str(&mut out, table);
write_str(&mut out, column);
}
}
out.extend_from_slice(&seq.last_value.to_le_bytes());
out.push(u8::from(seq.is_called));
}
write_u32(
&mut out,
u32::try_from(self.views.len()).expect("≤ 4G views"),
);
for view in self.views.values() {
write_str(&mut out, &view.name);
write_u16(
&mut out,
u16::try_from(view.columns.len()).expect("≤ 65k cols / view"),
);
for c in &view.columns {
write_str(&mut out, c);
}
write_str_long(&mut out, &view.body);
}
write_u32(
&mut out,
u32::try_from(self.materialized_views.len()).expect("≤ 4G materialized views"),
);
for (name, body) in &self.materialized_views {
write_str(&mut out, name);
write_str_long(&mut out, body);
}
write_u32(
&mut out,
u32::try_from(self.enum_types.len()).expect("≤ 4G enum types"),
);
for e in self.enum_types.values() {
write_str(&mut out, &e.name);
write_u16(
&mut out,
u16::try_from(e.labels.len()).expect("≤ 65k labels / enum"),
);
for l in &e.labels {
write_str(&mut out, l);
}
}
write_u32(
&mut out,
u32::try_from(self.domain_types.len()).expect("≤ 4G domain types"),
);
for d in self.domain_types.values() {
write_str(&mut out, &d.name);
write_data_type(&mut out, d.base_type);
out.push(u8::from(d.nullable));
match &d.default {
None => out.push(0),
Some(s) => {
out.push(1);
write_str(&mut out, s);
}
}
write_u16(
&mut out,
u16::try_from(d.checks.len()).expect("≤ 65k CHECKs / domain"),
);
for c in &d.checks {
write_str(&mut out, c);
}
}
write_u32(
&mut out,
u32::try_from(self.schemas.len()).expect("≤ 4G schemas"),
);
for name in &self.schemas {
write_str(&mut out, name);
}
out
}
pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
let mut cur = Cursor::new(buf);
let magic = cur.take(8)?;
if magic != FILE_MAGIC {
return Err(StorageError::Corrupt(format!(
"bad magic: expected SPGDB001, got {magic:?}"
)));
}
let version = cur.read_u8()?;
if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
return Err(StorageError::Corrupt(format!(
"unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
)));
}
cur.codec_version = version;
let table_count = cur.read_u32()? as usize;
let mut cat = Self::new();
for _ in 0..table_count {
deserialize_table(&mut cur, &mut cat, version)?;
}
if version >= 22 {
let fn_count = cur.read_u32()? as usize;
for _ in 0..fn_count {
let name = cur.read_str()?;
let args_repr = cur.read_str()?;
let returns = cur.read_str()?;
let language = cur.read_str()?;
let body = cur.read_str_long()?;
cat.functions.insert(
name.clone(),
FunctionDef {
name,
args_repr,
returns,
language,
body,
},
);
}
let trg_count = cur.read_u32()? as usize;
for _ in 0..trg_count {
let name = cur.read_str()?;
let table = cur.read_str()?;
let timing = cur.read_str()?;
let ev_count = cur.read_u16()? as usize;
let mut events = Vec::with_capacity(ev_count);
for _ in 0..ev_count {
events.push(cur.read_str()?);
}
let for_each = cur.read_str()?;
let function = cur.read_str()?;
let update_columns = if version >= 23 {
let n = cur.read_u16()? as usize;
let mut cols = Vec::with_capacity(n);
for _ in 0..n {
cols.push(cur.read_str()?);
}
cols
} else {
Vec::new()
};
let enabled = if version >= 25 {
cur.read_u8()? != 0
} else {
true
};
cat.triggers.push(TriggerDef {
name,
table,
timing,
events,
for_each,
function,
update_columns,
enabled,
});
}
}
if version >= 26 {
let seq_count = cur.read_u32()? as usize;
for _ in 0..seq_count {
let name = cur.read_str()?;
let data_type = match cur.read_u8()? {
0 => SequenceDataType::SmallInt,
1 => SequenceDataType::Int,
2 => SequenceDataType::BigInt,
other => {
return Err(StorageError::Corrupt(format!(
"unknown SEQUENCE data-type tag {other}"
)));
}
};
let start = cur.read_i64()?;
let increment = cur.read_i64()?;
let min_value = cur.read_i64()?;
let max_value = cur.read_i64()?;
let cache = cur.read_i64()?;
let cycle = cur.read_u8()? != 0;
let owned_by = match cur.read_u8()? {
0 => None,
1 => {
let t = cur.read_str()?;
let c = cur.read_str()?;
Some((t, c))
}
other => {
return Err(StorageError::Corrupt(format!(
"unknown SEQUENCE owned-by tag {other}"
)));
}
};
let last_value = cur.read_i64()?;
let is_called = cur.read_u8()? != 0;
cat.sequences.insert(
name.clone(),
SequenceDef {
name,
data_type,
start,
increment,
min_value,
max_value,
cache,
cycle,
owned_by,
last_value,
is_called,
},
);
}
}
if version >= 27 {
let view_count = cur.read_u32()? as usize;
for _ in 0..view_count {
let name = cur.read_str()?;
let col_count = cur.read_u16()? as usize;
let mut columns = Vec::with_capacity(col_count);
for _ in 0..col_count {
columns.push(cur.read_str()?);
}
let body = cur.read_str_long()?;
cat.views.insert(
name.clone(),
ViewDef {
name,
columns,
body,
},
);
}
}
if version >= 28 {
let mv_count = cur.read_u32()? as usize;
for _ in 0..mv_count {
let name = cur.read_str()?;
let body = cur.read_str_long()?;
cat.materialized_views.insert(name, body);
}
}
if version >= 29 {
let etype_count = cur.read_u32()? as usize;
for _ in 0..etype_count {
let name = cur.read_str()?;
let label_count = cur.read_u16()? as usize;
let mut labels = Vec::with_capacity(label_count);
for _ in 0..label_count {
labels.push(cur.read_str()?);
}
cat.enum_types
.insert(name.clone(), EnumDef { name, labels });
}
}
if version >= 30 {
let dtype_count = cur.read_u32()? as usize;
for _ in 0..dtype_count {
let name = cur.read_str()?;
let base_type = cur.read_data_type()?;
let nullable = cur.read_u8()? != 0;
let default = match cur.read_u8()? {
0 => None,
1 => Some(cur.read_str()?),
other => {
return Err(StorageError::Corrupt(format!(
"unknown DOMAIN default tag {other}"
)));
}
};
let check_count = cur.read_u16()? as usize;
let mut checks = Vec::with_capacity(check_count);
for _ in 0..check_count {
checks.push(cur.read_str()?);
}
cat.domain_types.insert(
name.clone(),
DomainDef {
name,
base_type,
nullable,
default,
checks,
},
);
}
}
if version >= 31 {
let sch_count = cur.read_u32()? as usize;
for _ in 0..sch_count {
let name = cur.read_str()?;
cat.schemas.insert(name);
}
}
if cur.pos < buf.len() {
return Err(StorageError::Corrupt(format!(
"trailing bytes: {} unread",
buf.len() - cur.pos
)));
}
Ok(cat)
}
}
#[cfg(test)]
mod tests;