1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::format;
34use alloc::string::String;
35use alloc::sync::Arc;
36use alloc::vec::Vec;
37use core::fmt;
38
39use self::persistent::PersistentVec;
40use self::persistent_btree::PersistentBTreeMap;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
54pub enum VecEncoding {
55 #[default]
56 F32,
57 Sq8,
58 F16,
59}
60
61impl fmt::Display for VecEncoding {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Self::F32 => f.write_str("F32"),
65 Self::Sq8 => f.write_str("SQ8"),
66 Self::F16 => f.write_str("HALF"),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DataType {
76 SmallInt,
79 Int, BigInt, Float, Text,
83 Varchar(u32),
86 Char(u32),
90 Bool,
91 Vector {
97 dim: u32,
98 encoding: VecEncoding,
99 },
100 Numeric {
106 precision: u8,
107 scale: u8,
108 },
109 Date,
112 Timestamp,
115 Timestamptz,
123 Interval,
128 Json,
133 Jsonb,
139 Bytes,
147 TextArray,
156 IntArray,
160 BigIntArray,
163 TsVector,
171 TsQuery,
175}
176
177impl fmt::Display for DataType {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::SmallInt => f.write_str("SMALLINT"),
181 Self::Int => f.write_str("INT"),
182 Self::BigInt => f.write_str("BIGINT"),
183 Self::Float => f.write_str("FLOAT"),
184 Self::Text => f.write_str("TEXT"),
185 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
186 Self::Char(n) => write!(f, "CHAR({n})"),
187 Self::Bool => f.write_str("BOOL"),
188 Self::Vector { dim, encoding } => match encoding {
189 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
190 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
191 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
192 },
193 Self::Numeric { precision, scale } => {
194 if *scale == 0 {
195 write!(f, "NUMERIC({precision})")
196 } else {
197 write!(f, "NUMERIC({precision}, {scale})")
198 }
199 }
200 Self::Date => f.write_str("DATE"),
201 Self::Timestamp => f.write_str("TIMESTAMP"),
202 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
203 Self::Interval => f.write_str("INTERVAL"),
204 Self::Json => f.write_str("JSON"),
205 Self::Jsonb => f.write_str("JSONB"),
206 Self::Bytes => f.write_str("BYTEA"),
207 Self::TextArray => f.write_str("TEXT[]"),
208 Self::IntArray => f.write_str("INT[]"),
209 Self::BigIntArray => f.write_str("BIGINT[]"),
210 Self::TsVector => f.write_str("TSVECTOR"),
211 Self::TsQuery => f.write_str("TSQUERY"),
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct TsLexeme {
223 pub word: String,
224 pub positions: Vec<u16>,
225 pub weight: u8,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TsQueryAst {
233 Term {
237 word: String,
238 weight_mask: u8,
239 },
240 And(Box<TsQueryAst>, Box<TsQueryAst>),
241 Or(Box<TsQueryAst>, Box<TsQueryAst>),
242 Not(Box<TsQueryAst>),
243 Phrase {
246 left: Box<TsQueryAst>,
247 right: Box<TsQueryAst>,
248 distance: u16,
249 },
250}
251
252#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub enum Value {
258 SmallInt(i16),
259 Int(i32),
260 BigInt(i64),
261 Float(f64),
262 Text(String),
263 Bool(bool),
264 Vector(Vec<f32>),
265 Sq8Vector(crate::quantize::Sq8Vector),
272 HalfVector(crate::halfvec::HalfVector),
278 Numeric {
282 scaled: i128,
283 scale: u8,
284 },
285 Date(i32),
287 Timestamp(i64),
289 Interval {
292 months: i32,
293 micros: i64,
294 },
295 Json(String),
299 Bytes(Vec<u8>),
305 TextArray(Vec<Option<String>>),
311 IntArray(Vec<Option<i32>>),
315 BigIntArray(Vec<Option<i64>>),
318 TsVector(Vec<TsLexeme>),
323 TsQuery(TsQueryAst),
326 Null,
327}
328
329impl Value {
330 pub fn data_type(&self) -> Option<DataType> {
332 match self {
333 Self::SmallInt(_) => Some(DataType::SmallInt),
334 Self::Int(_) => Some(DataType::Int),
335 Self::BigInt(_) => Some(DataType::BigInt),
336 Self::Float(_) => Some(DataType::Float),
337 Self::Text(_) => Some(DataType::Text),
340 Self::Bool(_) => Some(DataType::Bool),
341 Self::Vector(v) => Some(DataType::Vector {
342 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
343 encoding: VecEncoding::F32,
344 }),
345 Self::Sq8Vector(q) => Some(DataType::Vector {
346 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
347 encoding: VecEncoding::Sq8,
348 }),
349 Self::HalfVector(h) => Some(DataType::Vector {
350 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
351 encoding: VecEncoding::F16,
352 }),
353 Self::Numeric { scale, .. } => Some(DataType::Numeric {
358 precision: 0,
359 scale: *scale,
360 }),
361 Self::Date(_) => Some(DataType::Date),
362 Self::Timestamp(_) => Some(DataType::Timestamp),
363 Self::Interval { .. } => Some(DataType::Interval),
364 Self::Json(_) => Some(DataType::Json),
365 Self::Bytes(_) => Some(DataType::Bytes),
366 Self::TextArray(_) => Some(DataType::TextArray),
367 Self::IntArray(_) => Some(DataType::IntArray),
368 Self::BigIntArray(_) => Some(DataType::BigIntArray),
369 Self::TsVector(_) => Some(DataType::TsVector),
370 Self::TsQuery(_) => Some(DataType::TsQuery),
371 Self::Null => None,
372 }
373 }
374
375 pub const fn is_null(&self) -> bool {
376 matches!(self, Self::Null)
377 }
378}
379
380#[derive(Debug, Clone, PartialEq)]
383pub struct Row {
384 pub values: Vec<Value>,
385}
386
387impl Row {
388 pub const fn new(values: Vec<Value>) -> Self {
389 Self { values }
390 }
391
392 pub fn len(&self) -> usize {
393 self.values.len()
394 }
395
396 pub fn is_empty(&self) -> bool {
397 self.values.is_empty()
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
402pub struct ColumnSchema {
403 pub name: String,
404 pub ty: DataType,
405 pub nullable: bool,
406 pub default: Option<Value>,
411 pub runtime_default: Option<String>,
419 pub auto_increment: bool,
423}
424
425#[derive(Debug, Clone, PartialEq)]
426pub struct TableSchema {
427 pub name: String,
428 pub columns: Vec<ColumnSchema>,
429 pub hot_tier_bytes: Option<u64>,
435 pub foreign_keys: Vec<ForeignKeyConstraint>,
442 pub uniqueness_constraints: Vec<UniquenessConstraint>,
449 pub checks: Vec<String>,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct UniquenessConstraint {
466 pub is_primary_key: bool,
471 pub columns: Vec<usize>,
475 pub nulls_not_distinct: bool,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct ForeignKeyConstraint {
490 pub name: Option<String>,
494 pub local_columns: Vec<usize>,
497 pub parent_table: String,
499 pub parent_columns: Vec<usize>,
504 pub on_delete: FkAction,
506 pub on_update: FkAction,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
513pub enum FkAction {
514 Restrict,
515 Cascade,
516 SetNull,
517 SetDefault,
518 NoAction,
519}
520
521impl FkAction {
522 pub const fn tag(self) -> u8 {
524 match self {
525 Self::Restrict => 0,
526 Self::Cascade => 1,
527 Self::SetNull => 2,
528 Self::SetDefault => 3,
529 Self::NoAction => 4,
530 }
531 }
532 pub const fn from_tag(b: u8) -> Option<Self> {
533 Some(match b {
534 0 => Self::Restrict,
535 1 => Self::Cascade,
536 2 => Self::SetNull,
537 3 => Self::SetDefault,
538 4 => Self::NoAction,
539 _ => return None,
540 })
541 }
542}
543
544impl TableSchema {
545 pub fn column_position(&self, name: &str) -> Option<usize> {
546 self.columns.iter().position(|c| c.name == name)
547 }
548}
549
550#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
555pub enum IndexKey {
556 Int(i64),
557 Text(String),
558 Bool(bool),
559}
560
561impl IndexKey {
562 pub fn from_value(v: &Value) -> Option<Self> {
563 match v {
564 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
565 Value::Int(n) => Some(Self::Int(i64::from(*n))),
566 Value::BigInt(n) => Some(Self::Int(*n)),
567 Value::Text(s) => Some(Self::Text(s.clone())),
568 Value::Bool(b) => Some(Self::Bool(*b)),
569 Value::Date(d) => Some(Self::Int(i64::from(*d))),
572 Value::Timestamp(t) => Some(Self::Int(*t)),
573 Value::Null
578 | Value::Float(_)
579 | Value::Vector(_)
580 | Value::Sq8Vector(_)
581 | Value::HalfVector(_)
582 | Value::Numeric { .. }
583 | Value::Interval { .. }
584 | Value::Json(_)
585 | Value::Bytes(_)
586 | Value::TextArray(_)
587 | Value::IntArray(_)
588 | Value::BigIntArray(_)
589 | Value::TsVector(_)
590 | Value::TsQuery(_) => None,
591 }
592 }
593}
594
595#[derive(Debug, Clone)]
600pub struct Index {
601 pub name: String,
602 pub column_position: usize,
603 pub kind: IndexKind,
604 pub included_columns: Vec<usize>,
614 pub partial_predicate: Option<String>,
621 pub expression: Option<String>,
626 pub is_unique: bool,
633 pub extra_column_positions: Vec<usize>,
642}
643
644pub const NSW_DEFAULT_M: usize = 16;
647
648#[derive(Debug, Clone)]
656pub struct FreezeReport {
657 pub segment_id: u32,
660 pub frozen_rows: usize,
663 pub bytes_freed: u64,
667 pub segment_bytes: Vec<u8>,
672}
673
674#[derive(Debug, Clone)]
683pub struct FreezeSlice {
684 pub row_range: core::ops::Range<usize>,
689 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
695}
696
697#[derive(Debug, Clone)]
713pub struct CompactReport {
714 pub sources: Vec<u32>,
716 pub merged_segment_id: Option<u32>,
718 pub merged_segment_bytes: Vec<u8>,
720 pub merged_rows: usize,
722 pub deleted_rows_pruned: usize,
727 pub bytes_reclaimed_estimate: u64,
731}
732
733#[derive(Debug, Clone)]
734pub enum IndexKind {
735 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
752 Nsw(NswGraph),
754 Brin {
761 column_type: DataType,
765 },
766 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
782}
783
784#[derive(Debug, Clone)]
793pub struct NswGraph {
794 pub m: usize,
796 pub m_max_0: usize,
799 pub entry: Option<usize>,
802 pub entry_level: u8,
804 pub levels: PersistentVec<u8>,
811 pub layers: Vec<PersistentVec<Vec<u32>>>,
827}
828
829impl NswGraph {
830 fn new(m: usize) -> Self {
831 Self {
832 m,
833 m_max_0: m.saturating_mul(2),
834 entry: None,
835 entry_level: 0,
836 levels: PersistentVec::new(),
837 layers: alloc::vec![PersistentVec::new()],
838 }
839 }
840
841 pub const fn cap_for_layer(&self, layer: u8) -> usize {
843 if layer == 0 { self.m_max_0 } else { self.m }
844 }
845}
846
847#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
854 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
857 x ^= x >> 30;
858 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
859 x ^= x >> 27;
860 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
861 x ^= x >> 31;
862 let mut level: u8 = 0;
867 while x & 0xF == 0 && level < MAX_LEVEL {
868 level += 1;
869 x >>= 4;
870 }
871 level
872}
873
874impl Index {
875 fn new_btree(name: String, column_position: usize) -> Self {
876 Self {
877 name,
878 column_position,
879 kind: IndexKind::BTree(PersistentBTreeMap::new()),
880 included_columns: Vec::new(),
881 partial_predicate: None,
882 expression: None,
883 is_unique: false,
884 extra_column_positions: Vec::new(),
885 }
886 }
887
888 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
889 Self {
890 name,
891 column_position,
892 kind: IndexKind::Nsw(NswGraph::new(m)),
893 included_columns: Vec::new(),
894 partial_predicate: None,
895 expression: None,
896 is_unique: false,
897 extra_column_positions: Vec::new(),
898 }
899 }
900
901 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
905 Self {
906 name,
907 column_position,
908 kind: IndexKind::Brin { column_type },
909 included_columns: Vec::new(),
910 partial_predicate: None,
911 expression: None,
912 is_unique: false,
913 extra_column_positions: Vec::new(),
914 }
915 }
916
917 fn new_gin(name: String, column_position: usize) -> Self {
922 Self {
923 name,
924 column_position,
925 kind: IndexKind::Gin(PersistentBTreeMap::new()),
926 included_columns: Vec::new(),
927 partial_predicate: None,
928 expression: None,
929 is_unique: false,
930 extra_column_positions: Vec::new(),
931 }
932 }
933
934 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
943 match &self.kind {
944 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
945 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => &[][..],
948 }
949 }
950
951 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
955 match &self.kind {
956 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
957 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
958 }
959 }
960
961 pub const fn nsw(&self) -> Option<&NswGraph> {
964 match &self.kind {
965 IndexKind::Nsw(g) => Some(g),
966 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
967 }
968 }
969
970 pub const fn is_brin(&self) -> bool {
975 matches!(self.kind, IndexKind::Brin { .. })
976 }
977
978 pub const fn is_gin(&self) -> bool {
982 matches!(self.kind, IndexKind::Gin(_))
983 }
984}
985
986#[derive(Debug, Clone)]
1002pub struct Table {
1003 schema: TableSchema,
1004 rows: PersistentVec<Row>,
1005 indices: Vec<Index>,
1006 hot_bytes: u64,
1007 cold_row_count: u64,
1021 cold_row_count_stale: bool,
1026}
1027
1028impl Table {
1029 pub fn new(schema: TableSchema) -> Self {
1030 Self {
1031 schema,
1032 rows: PersistentVec::new(),
1033 indices: Vec::new(),
1034 hot_bytes: 0,
1035 cold_row_count: 0,
1036 cold_row_count_stale: false,
1037 }
1038 }
1039
1040 #[must_use]
1044 pub const fn hot_bytes(&self) -> u64 {
1045 self.hot_bytes
1046 }
1047
1048 #[must_use]
1051 pub const fn cold_row_count(&self) -> u64 {
1052 self.cold_row_count
1053 }
1054
1055 pub fn set_cold_row_count(&mut self, n: u64) {
1058 self.cold_row_count = n;
1059 self.cold_row_count_stale = false;
1060 }
1061
1062 pub fn mark_cold_row_count_stale(&mut self) {
1067 self.cold_row_count_stale = true;
1068 }
1069
1070 #[must_use]
1074 pub const fn cold_row_count_stale(&self) -> bool {
1075 self.cold_row_count_stale
1076 }
1077
1078 #[must_use]
1089 pub fn count_cold_locators(&self) -> u64 {
1090 let mut best: u64 = 0;
1091 for idx in &self.indices {
1092 if let IndexKind::BTree(map) = &idx.kind {
1093 let n: u64 = map
1094 .iter()
1095 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1096 .sum();
1097 if n > best {
1098 best = n;
1099 }
1100 }
1101 }
1102 best
1103 }
1104
1105 pub const fn schema(&self) -> &TableSchema {
1106 &self.schema
1107 }
1108
1109 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1113 &mut self.schema
1114 }
1115
1116 pub const fn rows(&self) -> &PersistentVec<Row> {
1120 &self.rows
1121 }
1122
1123 pub const fn row_count(&self) -> usize {
1124 self.rows.len()
1125 }
1126
1127 pub fn indices_mut(&mut self) -> &mut [Index] {
1132 &mut self.indices
1133 }
1134
1135 pub fn indices(&self) -> &[Index] {
1136 &self.indices
1137 }
1138
1139 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1145 let ty = self.schema.columns.get(col_pos)?.ty;
1146 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1147 return None;
1148 }
1149 let mut max: Option<i64> = None;
1150 for row in &self.rows {
1151 match row.values.get(col_pos) {
1152 Some(Value::SmallInt(n)) => {
1153 let v = i64::from(*n);
1154 max = Some(max.map_or(v, |m| m.max(v)));
1155 }
1156 Some(Value::Int(n)) => {
1157 let v = i64::from(*n);
1158 max = Some(max.map_or(v, |m| m.max(v)));
1159 }
1160 Some(Value::BigInt(n)) => {
1161 max = Some(max.map_or(*n, |m| m.max(*n)));
1162 }
1163 _ => {}
1164 }
1165 }
1166 Some(max.map_or(1, |m| m + 1))
1167 }
1168
1169 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1173 self.indices
1180 .iter()
1181 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1182 .or_else(|| {
1183 self.indices.iter().find(|i| {
1184 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1185 })
1186 })
1187 }
1188
1189 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1193 if row.len() != self.schema.columns.len() {
1194 return Err(StorageError::ArityMismatch {
1195 expected: self.schema.columns.len(),
1196 actual: row.len(),
1197 });
1198 }
1199 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1200 if val.is_null() {
1201 if !col.nullable {
1202 return Err(StorageError::NullInNotNull {
1203 column: col.name.clone(),
1204 });
1205 }
1206 continue;
1207 }
1208 let actual = val.data_type().expect("non-null");
1209 let compatible = actual == col.ty
1223 || matches!(
1224 (actual, col.ty),
1225 (
1226 DataType::Text,
1227 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1228 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1229 | (DataType::Json, DataType::Jsonb)
1230 | (DataType::Jsonb, DataType::Json)
1231 | (DataType::Timestamp, DataType::Timestamptz)
1232 | (DataType::Timestamptz, DataType::Timestamp)
1233 )
1234 || matches!(
1235 (actual, col.ty),
1236 (
1237 DataType::Numeric { scale: a, .. },
1238 DataType::Numeric { scale: b, .. },
1239 ) if a == b
1240 );
1241 if !compatible {
1242 return Err(StorageError::TypeMismatch {
1243 column: col.name.clone(),
1244 expected: col.ty,
1245 actual,
1246 position: i,
1247 });
1248 }
1249 }
1250 let new_row_idx = self.rows.len();
1251 for idx in &mut self.indices {
1255 match &mut idx.kind {
1256 IndexKind::BTree(map) => {
1257 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1258 let mut entries = map.get(&key).cloned().unwrap_or_default();
1264 entries.push(RowLocator::Hot(new_row_idx));
1265 map.insert_mut(key, entries);
1266 }
1267 }
1268 IndexKind::Gin(map) => {
1269 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1273 for lex in lexemes {
1274 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1275 entries.push(RowLocator::Hot(new_row_idx));
1276 map.insert_mut(lex.word.clone(), entries);
1277 }
1278 }
1279 }
1280 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1284 }
1285 }
1286 self.hot_bytes = self
1289 .hot_bytes
1290 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1291 self.rows.push_mut(row);
1296 let new_row_idx = self.rows.len() - 1;
1299 let nsw_targets: Vec<usize> = self
1300 .indices
1301 .iter()
1302 .enumerate()
1303 .filter_map(|(i, idx)| {
1304 if matches!(idx.kind, IndexKind::Nsw(_)) {
1305 Some(i)
1306 } else {
1307 None
1308 }
1309 })
1310 .collect();
1311 for idx_pos in nsw_targets {
1312 nsw_insert_at(self, idx_pos, new_row_idx);
1313 }
1314 Ok(())
1315 }
1316
1317 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1321 if self.indices.iter().any(|i| i.name == name) {
1322 return Err(StorageError::DuplicateIndex { name });
1323 }
1324 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1325 StorageError::ColumnNotFound {
1326 column: column_name.into(),
1327 }
1328 })?;
1329 let mut idx = Index::new_btree(name, column_position);
1330 if let IndexKind::BTree(map) = &mut idx.kind {
1331 for (i, row) in self.rows.iter().enumerate() {
1332 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1333 let mut entries = map.get(&key).cloned().unwrap_or_default();
1334 entries.push(RowLocator::Hot(i));
1335 map.insert_mut(key, entries);
1336 }
1337 }
1338 }
1339 self.indices.push(idx);
1340 Ok(())
1341 }
1342
1343 pub fn add_nsw_index(
1348 &mut self,
1349 name: String,
1350 column_name: &str,
1351 m: usize,
1352 ) -> Result<(), StorageError> {
1353 self.add_nsw_index_inner(name, column_name, m, None)
1354 }
1355
1356 pub fn rebuild_nsw_index(
1368 &mut self,
1369 name: &str,
1370 new_encoding: Option<VecEncoding>,
1371 ) -> Result<(), StorageError> {
1372 let idx_pos = self
1373 .indices
1374 .iter()
1375 .position(|i| i.name == name)
1376 .ok_or_else(|| StorageError::IndexNotFound {
1377 name: String::from(name),
1378 })?;
1379 let col_pos = self.indices[idx_pos].column_position;
1380 let m = match &self.indices[idx_pos].kind {
1381 IndexKind::Nsw(g) => g.m,
1382 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1383 return Err(StorageError::Unsupported(format!(
1384 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1385 )));
1386 }
1387 };
1388 let col_name = self.schema.columns[col_pos].name.clone();
1389 if let Some(target) = new_encoding {
1392 let current = match self.schema.columns[col_pos].ty {
1393 DataType::Vector { encoding, .. } => encoding,
1394 ref other => {
1395 return Err(StorageError::Unsupported(format!(
1396 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1397 )));
1398 }
1399 };
1400 if target != current {
1401 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1402 unreachable!("checked above")
1403 };
1404 let n = self.rows.len();
1405 for i in 0..n {
1406 let row = self
1407 .rows
1408 .get_mut(i)
1409 .expect("row index in bounds (we iterated up to len())");
1410 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1411 let recoded = recode_vector_cell(cell, target)?;
1412 row.values[col_pos] = recoded;
1413 }
1414 self.schema.columns[col_pos].ty = DataType::Vector {
1415 dim,
1416 encoding: target,
1417 };
1418 }
1419 }
1420 self.indices.remove(idx_pos);
1422 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1423 Ok(())
1424 }
1425
1426 pub fn restore_nsw_index(
1431 &mut self,
1432 name: String,
1433 column_name: &str,
1434 graph: NswGraph,
1435 ) -> Result<(), StorageError> {
1436 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1437 }
1438
1439 pub fn restore_btree_index(
1446 &mut self,
1447 name: String,
1448 column_name: &str,
1449 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1450 ) -> Result<(), StorageError> {
1451 if self.indices.iter().any(|i| i.name == name) {
1452 return Err(StorageError::DuplicateIndex { name });
1453 }
1454 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1455 StorageError::ColumnNotFound {
1456 column: column_name.into(),
1457 }
1458 })?;
1459 self.indices.push(Index {
1460 name,
1461 column_position,
1462 kind: IndexKind::BTree(map),
1463 included_columns: Vec::new(),
1464 partial_predicate: None,
1465 expression: None,
1466 is_unique: false,
1467 extra_column_positions: Vec::new(),
1468 });
1469 Ok(())
1470 }
1471
1472 pub fn restore_brin_index(
1477 &mut self,
1478 name: String,
1479 column_name: &str,
1480 column_type: DataType,
1481 ) -> Result<(), StorageError> {
1482 if self.indices.iter().any(|i| i.name == name) {
1483 return Err(StorageError::DuplicateIndex { name });
1484 }
1485 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1486 StorageError::ColumnNotFound {
1487 column: column_name.into(),
1488 }
1489 })?;
1490 self.indices
1491 .push(Index::new_brin(name, column_position, column_type));
1492 Ok(())
1493 }
1494
1495 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1499 if self.indices.iter().any(|i| i.name == name) {
1500 return Err(StorageError::DuplicateIndex { name });
1501 }
1502 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1503 StorageError::ColumnNotFound {
1504 column: column_name.into(),
1505 }
1506 })?;
1507 let column_type = self.schema.columns[column_position].ty;
1508 self.indices
1509 .push(Index::new_brin(name, column_position, column_type));
1510 Ok(())
1511 }
1512
1513 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1518 if self.indices.iter().any(|i| i.name == name) {
1519 return Err(StorageError::DuplicateIndex { name });
1520 }
1521 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1522 StorageError::ColumnNotFound {
1523 column: column_name.into(),
1524 }
1525 })?;
1526 if self.schema.columns[column_position].ty != DataType::TsVector {
1527 return Err(StorageError::Corrupt(format!(
1528 "GIN index {name:?} requires a tsvector column; \
1529 {column_name:?} is {:?}",
1530 self.schema.columns[column_position].ty
1531 )));
1532 }
1533 let mut idx = Index::new_gin(name, column_position);
1534 if let IndexKind::Gin(map) = &mut idx.kind {
1535 for (i, row) in self.rows.iter().enumerate() {
1536 if let Value::TsVector(lexemes) = &row.values[column_position] {
1537 for lex in lexemes {
1538 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1539 entries.push(RowLocator::Hot(i));
1540 map.insert_mut(lex.word.clone(), entries);
1541 }
1542 }
1543 }
1544 }
1545 self.indices.push(idx);
1546 Ok(())
1547 }
1548
1549 pub fn restore_gin_index(
1554 &mut self,
1555 name: String,
1556 column_name: &str,
1557 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1558 ) -> Result<(), StorageError> {
1559 if self.indices.iter().any(|i| i.name == name) {
1560 return Err(StorageError::DuplicateIndex { name });
1561 }
1562 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1563 StorageError::ColumnNotFound {
1564 column: column_name.into(),
1565 }
1566 })?;
1567 let mut idx = Index::new_gin(name, column_position);
1568 idx.kind = IndexKind::Gin(map);
1569 self.indices.push(idx);
1570 Ok(())
1571 }
1572
1573 pub fn register_cold_locators<I>(
1590 &mut self,
1591 index_name: &str,
1592 locators: I,
1593 ) -> Result<usize, StorageError>
1594 where
1595 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1596 {
1597 let idx = self
1598 .indices
1599 .iter_mut()
1600 .find(|i| i.name == index_name)
1601 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1602 let map = match &mut idx.kind {
1603 IndexKind::BTree(map) => map,
1604 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1605 return Err(StorageError::Corrupt(format!(
1606 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1607 )));
1608 }
1609 };
1610 let mut count = 0usize;
1611 for (key, locator) in locators {
1612 let mut entries = map.get(&key).cloned().unwrap_or_default();
1613 entries.push(locator);
1614 map.insert_mut(key, entries);
1615 count += 1;
1616 }
1617 Ok(count)
1618 }
1619
1620 pub fn register_gin_cold_locators<I>(
1625 &mut self,
1626 index_name: &str,
1627 locators: I,
1628 ) -> Result<usize, StorageError>
1629 where
1630 I: IntoIterator<Item = (String, RowLocator)>,
1631 {
1632 let idx = self
1633 .indices
1634 .iter_mut()
1635 .find(|i| i.name == index_name)
1636 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1637 let map = match &mut idx.kind {
1638 IndexKind::Gin(map) => map,
1639 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1640 return Err(StorageError::Corrupt(format!(
1641 "register_gin_cold_locators: index {index_name:?} is not GIN"
1642 )));
1643 }
1644 };
1645 let mut count = 0usize;
1646 for (word, locator) in locators {
1647 let mut entries = map.get(&word).cloned().unwrap_or_default();
1648 entries.push(locator);
1649 map.insert_mut(word, entries);
1650 count += 1;
1651 }
1652 Ok(count)
1653 }
1654
1655 pub fn remove_cold_locators_for_key(
1665 &mut self,
1666 index_name: &str,
1667 key: &IndexKey,
1668 ) -> Result<usize, StorageError> {
1669 let idx = self
1670 .indices
1671 .iter_mut()
1672 .find(|i| i.name == index_name)
1673 .ok_or_else(|| {
1674 StorageError::Corrupt(format!(
1675 "remove_cold_locators_for_key: index {index_name:?} not found"
1676 ))
1677 })?;
1678 let map = match &mut idx.kind {
1679 IndexKind::BTree(map) => map,
1680 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1681 return Err(StorageError::Corrupt(format!(
1682 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1683 cold locators apply only to BTree indices"
1684 )));
1685 }
1686 };
1687 let Some(entries) = map.get(key) else {
1688 return Ok(0);
1689 };
1690 let mut kept: Vec<RowLocator> =
1691 entries.iter().copied().filter(RowLocator::is_hot).collect();
1692 let removed = entries.len() - kept.len();
1693 if removed == 0 {
1694 return Ok(0);
1695 }
1696 kept.shrink_to_fit();
1697 map.insert_mut(key.clone(), kept);
1705 Ok(removed)
1706 }
1707
1708 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1715 self.schema.columns.push(col);
1716 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1717 for row in self.rows.iter() {
1718 let mut values = row.values.clone();
1719 values.push(fill_value.clone());
1720 new_rows.push_mut(Row::new(values));
1721 }
1722 self.rows = new_rows;
1723 }
1724
1725 pub fn drop_column(&mut self, col_pos: usize) {
1735 debug_assert!(col_pos < self.schema.columns.len());
1736 self.schema.columns.remove(col_pos);
1738 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1740 for row in self.rows.iter() {
1741 let mut values = row.values.clone();
1742 if col_pos < values.len() {
1743 values.remove(col_pos);
1744 }
1745 new_rows.push_mut(Row::new(values));
1746 }
1747 self.rows = new_rows;
1748 self.indices.retain(|idx| idx.column_position != col_pos);
1750 for idx in &mut self.indices {
1751 if idx.column_position > col_pos {
1752 idx.column_position -= 1;
1753 }
1754 for inc in &mut idx.included_columns {
1756 if *inc as usize > col_pos {
1757 *inc -= 1;
1758 }
1759 }
1760 }
1761 let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
1766 for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
1767 uc.columns.retain(|&c| c != col_pos);
1768 if uc.columns.is_empty() {
1769 continue;
1770 }
1771 for c in &mut uc.columns {
1772 if *c > col_pos {
1773 *c -= 1;
1774 }
1775 }
1776 surviving_ucs.push(uc);
1777 }
1778 self.schema.uniqueness_constraints = surviving_ucs;
1779 for fk in &mut self.schema.foreign_keys {
1782 for c in &mut fk.local_columns {
1783 if *c > col_pos {
1784 *c -= 1;
1785 }
1786 }
1787 }
1788 self.rebuild_indices();
1795 }
1796
1797 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1803 if positions.is_empty() {
1804 return 0;
1805 }
1806 let mut to_remove = alloc::vec![false; self.rows.len()];
1810 let mut removed = 0;
1811 for &p in positions {
1812 if p < to_remove.len() && !to_remove[p] {
1813 to_remove[p] = true;
1814 removed += 1;
1815 }
1816 }
1817 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1818 let mut removed_bytes: u64 = 0;
1819 for (i, row) in self.rows.iter().enumerate() {
1820 if to_remove[i] {
1821 removed_bytes =
1822 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1823 } else {
1824 new_rows.push_mut(row.clone());
1825 }
1826 }
1827 self.rows = new_rows;
1828 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1829 self.rebuild_indices();
1830 removed
1831 }
1832
1833 pub fn update_row(
1839 &mut self,
1840 position: usize,
1841 new_values: Vec<Value>,
1842 ) -> Result<(), StorageError> {
1843 if position >= self.rows.len() {
1844 return Err(StorageError::Corrupt(alloc::format!(
1845 "update_row: position {position} out of bounds (rows={})",
1846 self.rows.len()
1847 )));
1848 }
1849 if new_values.len() != self.schema.columns.len() {
1850 return Err(StorageError::ArityMismatch {
1851 expected: self.schema.columns.len(),
1852 actual: new_values.len(),
1853 });
1854 }
1855 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1859 if val.is_null() {
1860 if !col.nullable {
1861 return Err(StorageError::NullInNotNull {
1862 column: col.name.clone(),
1863 });
1864 }
1865 continue;
1866 }
1867 let actual = val.data_type().expect("non-null");
1868 let compatible = actual == col.ty
1869 || matches!(
1870 (actual, col.ty),
1871 (
1872 DataType::Text,
1873 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1874 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1875 | (DataType::Json, DataType::Jsonb)
1876 | (DataType::Jsonb, DataType::Json)
1877 | (DataType::Timestamp, DataType::Timestamptz)
1878 | (DataType::Timestamptz, DataType::Timestamp)
1879 )
1880 || matches!(
1881 (actual, col.ty),
1882 (
1883 DataType::Numeric { scale: a, .. },
1884 DataType::Numeric { scale: b, .. },
1885 ) if a == b
1886 );
1887 if !compatible {
1888 return Err(StorageError::TypeMismatch {
1889 column: col.name.clone(),
1890 expected: col.ty,
1891 actual,
1892 position: i,
1893 });
1894 }
1895 }
1896 let old_row = self
1897 .rows
1898 .get(position)
1899 .expect("position bounds-checked above");
1900 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1901 let new_row = Row::new(new_values);
1902 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1903 self.rows = self
1904 .rows
1905 .set(position, new_row)
1906 .expect("position bounds-checked above");
1907 self.hot_bytes = self
1908 .hot_bytes
1909 .saturating_sub(old_bytes)
1910 .saturating_add(new_bytes);
1911 self.rebuild_indices();
1912 Ok(())
1913 }
1914
1915 fn rebuild_indices(&mut self) {
1922 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1931 .indices
1932 .iter()
1933 .filter_map(|idx| match &idx.kind {
1934 IndexKind::BTree(map) => {
1935 let cold: Vec<(IndexKey, RowLocator)> = map
1936 .iter()
1937 .flat_map(|(k, locs)| {
1938 locs.iter()
1939 .filter(|l| l.is_cold())
1940 .copied()
1941 .map(move |l| (k.clone(), l))
1942 })
1943 .collect();
1944 if cold.is_empty() {
1945 None
1946 } else {
1947 Some((idx.name.clone(), cold))
1948 }
1949 }
1950 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
1953 })
1954 .collect();
1955
1956 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
1961 .indices
1962 .iter()
1963 .filter_map(|idx| match &idx.kind {
1964 IndexKind::Gin(map) => {
1965 let cold: Vec<(String, RowLocator)> = map
1966 .iter()
1967 .flat_map(|(w, locs)| {
1968 locs.iter()
1969 .filter(|l| l.is_cold())
1970 .copied()
1971 .map(move |l| (w.clone(), l))
1972 })
1973 .collect();
1974 if cold.is_empty() {
1975 None
1976 } else {
1977 Some((idx.name.clone(), cold))
1978 }
1979 }
1980 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1981 })
1982 .collect();
1983
1984 #[derive(Clone)]
1989 enum RebuildKind {
1990 BTree,
1991 Nsw(usize),
1992 Brin(DataType),
1993 Gin,
1994 }
1995 let descriptors: Vec<(String, usize, RebuildKind)> = self
1996 .indices
1997 .iter()
1998 .map(|idx| {
1999 let kind = match &idx.kind {
2000 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
2001 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
2002 IndexKind::BTree(_) => RebuildKind::BTree,
2003 IndexKind::Gin(_) => RebuildKind::Gin,
2004 };
2005 (idx.name.clone(), idx.column_position, kind)
2006 })
2007 .collect();
2008 self.indices.clear();
2009 for (name, column_position, rebuild_kind) in descriptors {
2010 match rebuild_kind {
2011 RebuildKind::Nsw(m) => {
2012 let idx = Index::new_nsw(name, column_position, m);
2013 self.indices.push(idx);
2014 let idx_pos = self.indices.len() - 1;
2015 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2016 for row_idx in row_indices {
2017 nsw_insert_at(self, idx_pos, row_idx);
2018 }
2019 }
2020 RebuildKind::Brin(column_type) => {
2021 self.indices
2024 .push(Index::new_brin(name, column_position, column_type));
2025 }
2026 RebuildKind::BTree => {
2027 let mut idx = Index::new_btree(name, column_position);
2028 if let IndexKind::BTree(map) = &mut idx.kind {
2029 for (i, row) in self.rows.iter().enumerate() {
2030 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
2031 let mut entries = map.get(&key).cloned().unwrap_or_default();
2032 entries.push(RowLocator::Hot(i));
2033 map.insert_mut(key, entries);
2034 }
2035 }
2036 }
2037 self.indices.push(idx);
2038 }
2039 RebuildKind::Gin => {
2040 let mut idx = Index::new_gin(name, column_position);
2041 if let IndexKind::Gin(map) = &mut idx.kind {
2042 for (i, row) in self.rows.iter().enumerate() {
2043 if let Value::TsVector(lexemes) = &row.values[column_position] {
2044 for lex in lexemes {
2045 let mut entries =
2046 map.get(&lex.word).cloned().unwrap_or_default();
2047 entries.push(RowLocator::Hot(i));
2048 map.insert_mut(lex.word.clone(), entries);
2049 }
2050 }
2051 }
2052 }
2053 self.indices.push(idx);
2054 }
2055 }
2056 }
2057
2058 for (idx_name, locators) in preserved_cold {
2063 let _ = self.register_cold_locators(&idx_name, locators);
2067 }
2068 for (idx_name, locators) in preserved_gin_cold {
2070 let _ = self.register_gin_cold_locators(&idx_name, locators);
2071 }
2072 }
2073
2074 fn add_nsw_index_inner(
2075 &mut self,
2076 name: String,
2077 column_name: &str,
2078 m: usize,
2079 restore: Option<NswGraph>,
2080 ) -> Result<(), StorageError> {
2081 if self.indices.iter().any(|i| i.name == name) {
2082 return Err(StorageError::DuplicateIndex { name });
2083 }
2084 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2085 StorageError::ColumnNotFound {
2086 column: column_name.into(),
2087 }
2088 })?;
2089 if !matches!(
2090 self.schema.columns[column_position].ty,
2091 DataType::Vector { .. }
2092 ) {
2093 return Err(StorageError::TypeMismatch {
2094 column: column_name.into(),
2095 expected: DataType::Vector {
2096 dim: 0,
2097 encoding: VecEncoding::F32,
2098 },
2099 actual: self.schema.columns[column_position].ty,
2100 position: column_position,
2101 });
2102 }
2103 if let Some(graph) = restore {
2104 self.indices.push(Index {
2105 name,
2106 column_position,
2107 kind: IndexKind::Nsw(graph),
2108 included_columns: Vec::new(),
2109 partial_predicate: None,
2110 expression: None,
2111 is_unique: false,
2112 extra_column_positions: Vec::new(),
2113 });
2114 return Ok(());
2115 }
2116 let idx = Index::new_nsw(name, column_position, m);
2117 self.indices.push(idx);
2118 let idx_pos = self.indices.len() - 1;
2119 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2122 for row_idx in row_indices {
2123 nsw_insert_at(self, idx_pos, row_idx);
2124 }
2125 Ok(())
2126 }
2127}
2128
2129fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2136 if matches!(cell, Value::Null) {
2137 return Ok(cell);
2138 }
2139 let as_f32: Vec<f32> = match &cell {
2141 Value::Vector(v) => v.clone(),
2142 Value::Sq8Vector(q) => quantize::dequantize(q),
2143 Value::HalfVector(h) => h.to_f32_vec(),
2144 other => {
2145 return Err(StorageError::Unsupported(format!(
2146 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2147 other.data_type()
2148 )));
2149 }
2150 };
2151 Ok(match target {
2156 VecEncoding::F32 => Value::Vector(as_f32),
2157 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2158 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2159 })
2160}
2161
2162fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2169 let col_pos = table.indices[idx_pos].column_position;
2170 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2171 Value::Vector(v) => Some(v.len()),
2172 Value::Sq8Vector(q) => Some(q.bytes.len()),
2173 Value::HalfVector(h) => Some(h.dim()),
2174 _ => None,
2175 };
2176 let Some(dim) = cell_dim else {
2177 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2180 return;
2181 };
2182 if dim == 0 {
2183 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2184 return;
2185 }
2186 let level = nsw_assign_level(new_row_idx);
2187 ensure_node_slot(table, idx_pos, new_row_idx, level);
2188 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2189 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2190 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2191 unreachable!("nsw_insert_at on a non-NSW index")
2192 }
2193 };
2194 if entry.is_none() {
2196 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2197 g.entry = Some(new_row_idx);
2198 g.entry_level = level;
2199 *g.levels
2200 .get_mut(new_row_idx)
2201 .expect("levels slot padded by ensure_node_slot") = level;
2202 }
2203 return;
2204 }
2205 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2207 *g.levels
2208 .get_mut(new_row_idx)
2209 .expect("levels slot padded by ensure_node_slot") = level;
2210 }
2211 let query = match &table.rows[new_row_idx].values[col_pos] {
2212 Value::Vector(v) => v.clone(),
2213 Value::Sq8Vector(q) => quantize::dequantize(q),
2219 Value::HalfVector(h) => h.to_f32_vec(),
2222 _ => return,
2223 };
2224 let mut current = entry.expect("entry was Some above");
2227 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2228 if entry_level > level {
2229 for layer in (level + 1..=entry_level).rev() {
2230 (current, current_d) =
2231 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2232 }
2233 }
2234 let top = level.min(entry_level);
2238 let ef = (m * 2).max(8);
2239 for layer in (0..=top).rev() {
2240 let cap = if layer == 0 { m * 2 } else { m };
2241 let mut candidates = layer_beam_search(
2242 table,
2243 idx_pos,
2244 layer,
2245 current,
2246 current_d,
2247 &query,
2248 ef,
2249 NswMetric::L2,
2250 );
2251 candidates.retain(|&(_, n)| n != new_row_idx);
2252 if let Some(&(d, n)) = candidates.first() {
2255 current = n;
2256 current_d = d;
2257 }
2258 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2259 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2260 }
2261 if level > entry_level
2264 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2265 {
2266 g.entry = Some(new_row_idx);
2267 g.entry_level = level;
2268 }
2269}
2270
2271fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2275 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2276 unreachable!("ensure_node_slot on a BTree index");
2277 };
2278 while g.layers.len() <= level as usize {
2279 g.layers.push(PersistentVec::new());
2280 }
2281 while g.levels.len() <= new_row_idx {
2282 g.levels.push_mut(0);
2283 }
2284 for layer_vec in &mut g.layers {
2285 while layer_vec.len() <= new_row_idx {
2286 layer_vec.push_mut(Vec::new());
2287 }
2288 }
2289}
2290
2291fn greedy_layer_walk(
2297 table: &Table,
2298 idx_pos: usize,
2299 layer: u8,
2300 mut current: usize,
2301 mut current_d: f32,
2302 query: &[f32],
2303) -> (usize, f32) {
2304 let g = match &table.indices[idx_pos].kind {
2305 IndexKind::Nsw(g) => g,
2306 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2307 return (current, current_d);
2308 }
2309 };
2310 let col_pos = table.indices[idx_pos].column_position;
2311 loop {
2312 let neighbours: &[u32] = g
2313 .layers
2314 .get(layer as usize)
2315 .and_then(|layer_v| layer_v.get(current))
2316 .map_or(&[][..], Vec::as_slice);
2317 let mut best = current;
2318 let mut best_d = current_d;
2319 for &n in neighbours {
2320 let n = n as usize;
2321 let d = vec_l2_sq(table, col_pos, n, query);
2322 if d < best_d {
2323 best = n;
2324 best_d = d;
2325 }
2326 }
2327 if best == current {
2328 return (current, current_d);
2329 }
2330 current = best;
2331 current_d = best_d;
2332 }
2333}
2334
2335#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2348 table: &Table,
2349 idx_pos: usize,
2350 layer: u8,
2351 entry_node: usize,
2352 entry_d: f32,
2353 query: &[f32],
2354 ef: usize,
2355 metric: NswMetric,
2356) -> Vec<(f32, usize)> {
2357 let g = match &table.indices[idx_pos].kind {
2358 IndexKind::Nsw(g) => g,
2359 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2360 };
2361 let col_pos = table.indices[idx_pos].column_position;
2362 let d0 = if matches!(metric, NswMetric::L2) {
2363 entry_d
2364 } else {
2365 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2366 };
2367 let row_count = table.rows.len();
2368 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2369 if entry_node < row_count {
2370 visited[entry_node] = true;
2371 }
2372 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2375 alloc::collections::BinaryHeap::with_capacity(ef);
2376 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2377 alloc::collections::BinaryHeap::with_capacity(ef);
2378 candidates.push(NodeClosest {
2379 dist: d0,
2380 node: entry_node,
2381 });
2382 results.push(NodeFurthest {
2383 dist: d0,
2384 node: entry_node,
2385 });
2386 while let Some(cur) = candidates.pop() {
2387 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2388 if cur.dist > worst && results.len() >= ef {
2389 break;
2390 }
2391 let neighbours: &[u32] = g
2392 .layers
2393 .get(layer as usize)
2394 .and_then(|layer_v| layer_v.get(cur.node))
2395 .map_or(&[][..], Vec::as_slice);
2396 for &n in neighbours {
2397 let n = n as usize;
2398 if n >= row_count || visited[n] {
2399 continue;
2400 }
2401 visited[n] = true;
2402 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2406 if !dn.is_finite() {
2407 continue;
2408 }
2409 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2410 if results.len() < ef || dn < worst {
2411 results.push(NodeFurthest { dist: dn, node: n });
2412 if results.len() > ef {
2413 results.pop();
2414 }
2415 candidates.push(NodeClosest { dist: dn, node: n });
2416 }
2417 }
2418 }
2419 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2422 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2423 out
2424}
2425
2426#[derive(Debug, Clone, Copy)]
2430struct NodeClosest {
2431 dist: f32,
2432 node: usize,
2433}
2434impl PartialEq for NodeClosest {
2435 fn eq(&self, other: &Self) -> bool {
2436 self.dist == other.dist && self.node == other.node
2437 }
2438}
2439impl Eq for NodeClosest {}
2440impl PartialOrd for NodeClosest {
2441 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2442 Some(self.cmp(other))
2443 }
2444}
2445impl Ord for NodeClosest {
2446 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2447 other
2449 .dist
2450 .partial_cmp(&self.dist)
2451 .unwrap_or(core::cmp::Ordering::Equal)
2452 }
2453}
2454
2455#[derive(Debug, Clone, Copy)]
2458struct NodeFurthest {
2459 dist: f32,
2460 node: usize,
2461}
2462impl PartialEq for NodeFurthest {
2463 fn eq(&self, other: &Self) -> bool {
2464 self.dist == other.dist && self.node == other.node
2465 }
2466}
2467impl Eq for NodeFurthest {}
2468impl PartialOrd for NodeFurthest {
2469 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2470 Some(self.cmp(other))
2471 }
2472}
2473impl Ord for NodeFurthest {
2474 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2475 self.dist
2476 .partial_cmp(&other.dist)
2477 .unwrap_or(core::cmp::Ordering::Equal)
2478 }
2479}
2480
2481fn select_neighbours_heuristic(
2490 candidates: &[(f32, usize)],
2491 m: usize,
2492 table: &Table,
2493 col_pos: usize,
2494) -> Vec<usize> {
2495 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2496 for &(d_q, e) in candidates {
2497 if chosen.len() >= m {
2498 break;
2499 }
2500 if !matches!(
2505 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2506 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2507 ) {
2508 continue;
2509 }
2510 let mut covered = false;
2511 for &r in &chosen {
2512 if cell_l2_sq(table, col_pos, e, r) < d_q {
2516 covered = true;
2517 break;
2518 }
2519 }
2520 if !covered {
2521 chosen.push(e);
2522 }
2523 }
2524 chosen
2525}
2526
2527fn connect_at_layer(
2531 table: &mut Table,
2532 idx_pos: usize,
2533 layer: u8,
2534 new_row_idx: usize,
2535 peers: &[usize],
2536) {
2537 let col_pos = table.indices[idx_pos].column_position;
2538 let cap = match &table.indices[idx_pos].kind {
2539 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2540 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return,
2541 };
2542 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2547 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2548 let layer_v = &mut g.layers[layer as usize];
2549 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2550 *slot = peers
2551 .iter()
2552 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2553 .collect();
2554 }
2555 }
2556 for &peer in peers {
2557 if !matches!(
2561 &table.rows[peer].values[col_pos],
2562 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2563 ) {
2564 continue;
2565 }
2566 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2568 let layer_v = &mut g.layers[layer as usize];
2569 if let Some(slot) = layer_v.get_mut(peer)
2570 && !slot.contains(&new_row_u32)
2571 {
2572 slot.push(new_row_u32);
2573 }
2574 }
2575 let needs_trim = match &table.indices[idx_pos].kind {
2579 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2580 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => false,
2581 };
2582 if needs_trim {
2583 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2584 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2585 .iter()
2586 .map(|&n| n as usize)
2587 .collect(),
2588 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => continue,
2589 };
2590 let mut tagged: Vec<(f32, usize)> = current_peers
2595 .iter()
2596 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2597 .collect();
2598 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2599 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2600 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2601 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2602 {
2603 *slot = kept
2604 .into_iter()
2605 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2606 .collect();
2607 }
2608 }
2609 }
2610}
2611
2612fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2619 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2620 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2621 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2622 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2623 }
2624 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2628 halfvec::half_l2_distance_sq_asymmetric(h, query)
2629 }
2630 _ => f32::INFINITY,
2631 }
2632}
2633
2634fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2641 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2642 return f32::INFINITY;
2643 };
2644 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2645 return f32::INFINITY;
2646 };
2647 match (cell_a, cell_b) {
2648 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2649 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2650 quantize::sq8_l2_distance_sq(a, b)
2651 }
2652 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2657 halfvec::half_l2_distance_sq(a, b)
2658 }
2659 _ => f32::INFINITY,
2660 }
2661}
2662
2663fn cell_to_query_metric_distance(
2668 table: &Table,
2669 col_pos: usize,
2670 row: usize,
2671 query: &[f32],
2672 metric: NswMetric,
2673) -> f32 {
2674 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2675 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2676 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2677 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2678 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2679 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2680 },
2681 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2684 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2685 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2686 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2687 },
2688 _ => f32::INFINITY,
2689 }
2690}
2691
2692#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2698pub enum NswMetric {
2699 L2,
2702 InnerProduct,
2705 Cosine,
2708}
2709
2710fn nsw_search(
2716 table: &Table,
2717 idx_pos: usize,
2718 query: &[f32],
2719 k: usize,
2720 ef: usize,
2721 metric: NswMetric,
2722) -> Vec<(f32, usize)> {
2723 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2724 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2725 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2726 };
2727 let Some(entry) = entry else {
2728 return Vec::new();
2729 };
2730 let col_pos = table.indices[idx_pos].column_position;
2731 let sq8 = matches!(
2738 table.schema.columns.get(col_pos).map(|c| c.ty),
2739 Some(DataType::Vector {
2740 encoding: VecEncoding::Sq8,
2741 ..
2742 })
2743 );
2744 let ef = if sq8 {
2745 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2746 } else {
2747 ef.max(k)
2748 };
2749 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2751 let mut current = entry;
2752 let mut current_d = entry_d;
2753 for layer in (1..=entry_level).rev() {
2754 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2755 }
2756 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2758 if sq8 {
2759 results = sq8_rerank(table, col_pos, &results, query, metric);
2760 }
2761 results.truncate(k);
2762 results
2763}
2764
2765fn sq8_rerank(
2772 table: &Table,
2773 col_pos: usize,
2774 candidates: &[(f32, usize)],
2775 query: &[f32],
2776 metric: NswMetric,
2777) -> Vec<(f32, usize)> {
2778 let mut out: Vec<(f32, usize)> = candidates
2779 .iter()
2780 .filter_map(|&(adc_d, row)| {
2781 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2782 let Value::Sq8Vector(q) = cell else {
2783 return Some((adc_d, row));
2787 };
2788 let deq = quantize::dequantize(q);
2789 if deq.len() != query.len() {
2790 return None;
2791 }
2792 Some((metric_distance(metric, &deq, query), row))
2793 })
2794 .collect();
2795 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2796 out
2797}
2798
2799const SQ8_RERANK_OVER_FETCH: usize = 3;
2803
2804fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2805 match metric {
2806 NswMetric::L2 => l2_distance_sq(a, b),
2807 NswMetric::InnerProduct => -inner_product_f32(a, b),
2808 NswMetric::Cosine => {
2809 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2810 if na == 0.0 || nb == 0.0 {
2811 return f32::INFINITY;
2812 }
2813 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2816 1.0 - dot / denom
2817 }
2818 }
2819}
2820
2821#[doc(hidden)]
2830#[inline]
2831pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2832 #[cfg(target_arch = "aarch64")]
2833 {
2834 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2835 return unsafe { inner_product_neon(a, b) };
2838 }
2839 }
2840 inner_product_scalar(a, b)
2841}
2842
2843fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2844 let mut dot: f32 = 0.0;
2845 for (x, y) in a.iter().zip(b.iter()) {
2846 dot += x * y;
2847 }
2848 dot
2849}
2850
2851#[cfg(target_arch = "aarch64")]
2852#[target_feature(enable = "neon")]
2853#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2855 use core::arch::aarch64::{
2856 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2857 };
2858 unsafe {
2859 let zero: float32x4_t = vdupq_n_f32(0.0);
2862 let mut acc0 = zero;
2863 let mut acc1 = zero;
2864 let n = a.len();
2865 let mut i = 0usize;
2866 while i + 8 <= n {
2867 let av0 = vld1q_f32(a.as_ptr().add(i));
2868 let bv0 = vld1q_f32(b.as_ptr().add(i));
2869 acc0 = vfmaq_f32(acc0, av0, bv0);
2870 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2871 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2872 acc1 = vfmaq_f32(acc1, av1, bv1);
2873 i += 8;
2874 }
2875 while i + 4 <= n {
2876 let av = vld1q_f32(a.as_ptr().add(i));
2877 let bv = vld1q_f32(b.as_ptr().add(i));
2878 acc0 = vfmaq_f32(acc0, av, bv);
2879 i += 4;
2880 }
2881 vaddvq_f32(vaddq_f32(acc0, acc1))
2882 }
2883}
2884
2885#[doc(hidden)]
2892#[inline]
2893pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2894 #[cfg(target_arch = "aarch64")]
2895 {
2896 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2897 return unsafe { cosine_dot_norms_neon(a, b) };
2899 }
2900 }
2901 cosine_dot_norms_scalar(a, b)
2902}
2903
2904fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2905 let mut dot: f32 = 0.0;
2906 let mut na: f32 = 0.0;
2907 let mut nb: f32 = 0.0;
2908 for (x, y) in a.iter().zip(b.iter()) {
2909 dot += x * y;
2910 na += x * x;
2911 nb += y * y;
2912 }
2913 (dot, na, nb)
2914}
2915
2916#[cfg(target_arch = "aarch64")]
2917#[target_feature(enable = "neon")]
2918#[allow(clippy::many_single_char_names, clippy::similar_names)]
2919unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2920 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2921 unsafe {
2922 let zero: float32x4_t = vdupq_n_f32(0.0);
2923 let mut acc_dot = zero;
2924 let mut acc_na = zero;
2925 let mut acc_nb = zero;
2926 let n = a.len();
2927 let mut i = 0usize;
2928 while i + 4 <= n {
2929 let av = vld1q_f32(a.as_ptr().add(i));
2930 let bv = vld1q_f32(b.as_ptr().add(i));
2931 acc_dot = vfmaq_f32(acc_dot, av, bv);
2932 acc_na = vfmaq_f32(acc_na, av, av);
2933 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2934 i += 4;
2935 }
2936 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2937 }
2938}
2939
2940fn sqrt_newton_f32(x: f32) -> f32 {
2941 if x <= 0.0 {
2942 return 0.0;
2943 }
2944 let mut g = x;
2945 for _ in 0..10 {
2946 g = 0.5 * (g + x / g);
2947 }
2948 g
2949}
2950
2951#[inline]
2959fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2960 #[cfg(target_arch = "aarch64")]
2961 {
2962 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2963 return unsafe { l2_distance_sq_neon(a, b) };
2967 }
2968 }
2969 l2_distance_sq_scalar(a, b)
2970}
2971
2972fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2973 let mut sum: f32 = 0.0;
2974 for (x, y) in a.iter().zip(b.iter()) {
2975 let d = *x - *y;
2976 sum += d * d;
2977 }
2978 sum
2979}
2980
2981#[cfg(target_arch = "aarch64")]
2982#[target_feature(enable = "neon")]
2983#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2985 use core::arch::aarch64::{
2986 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2987 };
2988 unsafe {
2989 let zero: float32x4_t = vdupq_n_f32(0.0);
2994 let mut acc0 = zero;
2995 let mut acc1 = zero;
2996 let n = a.len();
2997 let mut i = 0usize;
2998 while i + 8 <= n {
3001 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3002 acc0 = vfmaq_f32(acc0, d0, d0);
3003 let d1 = vsubq_f32(
3004 vld1q_f32(a.as_ptr().add(i + 4)),
3005 vld1q_f32(b.as_ptr().add(i + 4)),
3006 );
3007 acc1 = vfmaq_f32(acc1, d1, d1);
3008 i += 8;
3009 }
3010 while i + 4 <= n {
3011 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3012 acc0 = vfmaq_f32(acc0, d, d);
3013 i += 4;
3014 }
3015 vaddvq_f32(vaddq_f32(acc0, acc1))
3016 }
3017}
3018
3019pub fn nsw_query(
3022 table: &Table,
3023 idx_name: &str,
3024 query: &[f32],
3025 k: usize,
3026 metric: NswMetric,
3027) -> Vec<usize> {
3028 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
3029 return Vec::new();
3030 };
3031 let ef = (k * 2).max(NSW_DEFAULT_M);
3032 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
3033 hits.truncate(k);
3034 hits.into_iter().map(|(_, idx)| idx).collect()
3035}
3036
3037pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
3041 table
3042 .indices
3043 .iter()
3044 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
3045}
3046
3047#[derive(Debug, Clone, Default)]
3059pub struct Catalog {
3060 tables: Vec<Table>,
3061 by_name: BTreeMap<String, usize>,
3064 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3086 functions: BTreeMap<String, FunctionDef>,
3093 triggers: Vec<TriggerDef>,
3098}
3099
3100#[derive(Debug, Clone, PartialEq, Eq)]
3106pub struct FunctionDef {
3107 pub name: String,
3108 pub args_repr: String,
3112 pub returns: String,
3117 pub language: String,
3119 pub body: String,
3124}
3125
3126#[derive(Debug, Clone, PartialEq, Eq)]
3130pub struct TriggerDef {
3131 pub name: String,
3132 pub table: String,
3134 pub timing: String,
3138 pub events: Vec<String>,
3141 pub for_each: String,
3145 pub function: String,
3147 pub update_columns: Vec<String>,
3154}
3155
3156impl Catalog {
3157 pub const fn new() -> Self {
3158 Self {
3159 tables: Vec::new(),
3160 by_name: BTreeMap::new(),
3161 cold_segments: Vec::new(),
3162 functions: BTreeMap::new(),
3163 triggers: Vec::new(),
3164 }
3165 }
3166
3167 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3171 &self.functions
3172 }
3173
3174 pub fn create_function(
3178 &mut self,
3179 def: FunctionDef,
3180 or_replace: bool,
3181 ) -> Result<(), StorageError> {
3182 if !or_replace && self.functions.contains_key(&def.name) {
3183 return Err(StorageError::Corrupt(format!(
3184 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3185 def.name
3186 )));
3187 }
3188 self.functions.insert(def.name.clone(), def);
3189 Ok(())
3190 }
3191
3192 pub fn drop_function(&mut self, name: &str) -> bool {
3196 self.functions.remove(name).is_some()
3197 }
3198
3199 pub fn triggers(&self) -> &[TriggerDef] {
3203 &self.triggers
3204 }
3205
3206 pub fn create_trigger(
3212 &mut self,
3213 def: TriggerDef,
3214 or_replace: bool,
3215 ) -> Result<(), StorageError> {
3216 if !self.by_name.contains_key(&def.table) {
3217 return Err(StorageError::TableNotFound {
3218 name: def.table.clone(),
3219 });
3220 }
3221 if !self.functions.contains_key(&def.function) {
3222 return Err(StorageError::Corrupt(format!(
3223 "trigger {:?} references unknown function {:?}",
3224 def.name, def.function
3225 )));
3226 }
3227 let dup = self
3228 .triggers
3229 .iter()
3230 .position(|t| t.name == def.name && t.table == def.table);
3231 match (dup, or_replace) {
3232 (Some(_), false) => Err(StorageError::Corrupt(format!(
3233 "trigger {:?} already exists on table {:?}",
3234 def.name, def.table
3235 ))),
3236 (Some(i), true) => {
3237 self.triggers[i] = def;
3238 Ok(())
3239 }
3240 (None, _) => {
3241 self.triggers.push(def);
3242 Ok(())
3243 }
3244 }
3245 }
3246
3247 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3250 let before = self.triggers.len();
3251 self.triggers
3252 .retain(|t| !(t.name == name && t.table == table));
3253 before != self.triggers.len()
3254 }
3255
3256 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3257 if self.by_name.contains_key(&schema.name) {
3258 return Err(StorageError::DuplicateTable {
3259 name: schema.name.clone(),
3260 });
3261 }
3262 let idx = self.tables.len();
3263 let name = schema.name.clone();
3264 self.tables.push(Table::new(schema));
3265 self.by_name.insert(name, idx);
3266 Ok(())
3267 }
3268
3269 pub fn get(&self, name: &str) -> Option<&Table> {
3270 let idx = *self.by_name.get(name)?;
3271 self.tables.get(idx)
3272 }
3273
3274 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3275 let idx = *self.by_name.get(name)?;
3276 self.tables.get_mut(idx)
3277 }
3278
3279 pub fn table_count(&self) -> usize {
3280 self.tables.len()
3281 }
3282
3283 pub fn table_names(&self) -> Vec<String> {
3286 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3287 }
3288
3289 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3300 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3301 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3302 })?;
3303 let seg = OwnedSegment::from_bytes(bytes)
3304 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3305 self.cold_segments.push(Some(Arc::new(seg)));
3306 Ok(id)
3307 }
3308
3309 pub fn load_segment_bytes_at(
3322 &mut self,
3323 target_id: u32,
3324 bytes: Vec<u8>,
3325 ) -> Result<(), StorageError> {
3326 let seg = OwnedSegment::from_bytes(bytes)
3327 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3328 let idx = target_id as usize;
3329 while self.cold_segments.len() <= idx {
3330 self.cold_segments.push(None);
3331 }
3332 if self.cold_segments[idx].is_some() {
3333 return Err(StorageError::Corrupt(format!(
3334 "load_segment_bytes_at: segment_id {target_id} already occupied"
3335 )));
3336 }
3337 self.cold_segments[idx] = Some(Arc::new(seg));
3338 Ok(())
3339 }
3340
3341 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3351 let idx = segment_id as usize;
3352 if idx >= self.cold_segments.len() {
3353 return Err(StorageError::Corrupt(format!(
3354 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3355 self.cold_segments.len()
3356 )));
3357 }
3358 self.cold_segments[idx] = None;
3359 Ok(())
3360 }
3361
3362 #[must_use]
3364 pub fn cold_segment_count(&self) -> usize {
3365 self.cold_segments.iter().filter(|s| s.is_some()).count()
3366 }
3367
3368 #[must_use]
3371 pub fn cold_segment_slot_count(&self) -> usize {
3372 self.cold_segments.len()
3373 }
3374
3375 #[must_use]
3380 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3381 self.cold_segments
3382 .iter()
3383 .enumerate()
3384 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3385 .collect()
3386 }
3387
3388 #[must_use]
3395 pub fn hot_tier_bytes(&self) -> u64 {
3396 self.tables
3397 .iter()
3398 .map(Table::hot_bytes)
3399 .fold(0u64, u64::saturating_add)
3400 }
3401
3402 pub fn freeze_oldest_to_cold(
3447 &mut self,
3448 table_name: &str,
3449 index_name: &str,
3450 max_rows: usize,
3451 ) -> Result<FreezeReport, StorageError> {
3452 if max_rows == 0 {
3454 return Err(StorageError::Corrupt(
3455 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3456 ));
3457 }
3458 let table = self.get(table_name).ok_or_else(|| {
3459 StorageError::Corrupt(format!(
3460 "freeze_oldest_to_cold: table {table_name:?} not found"
3461 ))
3462 })?;
3463 if max_rows > table.rows.len() {
3464 return Err(StorageError::Corrupt(format!(
3465 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3466 table.rows.len()
3467 )));
3468 }
3469 let idx = table
3470 .indices
3471 .iter()
3472 .find(|i| i.name == index_name)
3473 .ok_or_else(|| {
3474 StorageError::Corrupt(format!(
3475 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3476 ))
3477 })?;
3478 if !matches!(idx.kind, IndexKind::BTree(_)) {
3479 return Err(StorageError::Corrupt(format!(
3480 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3481 )));
3482 }
3483 let column_position = idx.column_position;
3484
3485 let schema = table.schema.clone();
3487 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3488 for row_idx in 0..max_rows {
3489 let row = table.rows.get(row_idx).expect("bounds-checked above");
3490 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3491 StorageError::Corrupt(format!(
3492 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3493 ))
3494 })?;
3495 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3496 StorageError::Corrupt(format!(
3497 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3498 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3499 ))
3500 })?;
3501 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3502 }
3503 to_freeze.sort_by_key(|(k, _, _)| *k);
3508 for w in to_freeze.windows(2) {
3512 if w[0].0 == w[1].0 {
3513 return Err(StorageError::Corrupt(format!(
3514 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3515 w[0].0
3516 )));
3517 }
3518 }
3519 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3523 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3527 .into_iter()
3528 .map(|(k, body, _)| (k, body))
3529 .collect();
3530 let frozen_rows = seg_rows.len();
3531 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3532 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3533
3534 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3543 let positions: Vec<usize> = (0..max_rows).collect();
3544 let t_mut = self
3545 .get_mut(table_name)
3546 .expect("just validated; still present");
3547 let removed = t_mut.delete_rows(&positions);
3548 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3549 let bytes_after = t_mut.hot_bytes();
3550 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3551
3552 let segment_id = self
3553 .load_segment_bytes(seg_bytes.clone())
3554 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3555 let new_cold = post_swap_keys.into_iter().map(|k| {
3556 (
3557 k,
3558 RowLocator::Cold {
3559 segment_id,
3560 page_offset: 0,
3561 },
3562 )
3563 });
3564 let t_mut = self.get_mut(table_name).expect("still present");
3565 t_mut.register_cold_locators(index_name, new_cold)?;
3566
3567 Ok(FreezeReport {
3568 segment_id,
3569 frozen_rows,
3570 bytes_freed,
3571 segment_bytes: seg_bytes,
3572 })
3573 }
3574
3575 #[must_use]
3581 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3582 self.cold_segments
3583 .get(segment_id as usize)
3584 .and_then(|s| s.as_deref())
3585 }
3586
3587 pub fn resolve_cold_locator(
3596 &self,
3597 table_name: &str,
3598 segment_id: u32,
3599 key: &IndexKey,
3600 ) -> Option<Row> {
3601 let t = self.get(table_name)?;
3602 let u64_key = index_key_as_u64(key)?;
3603 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3604 let payload = seg.lookup(u64_key)?;
3605 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3606 Some(row)
3607 }
3608
3609 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3627 let t = self.get(table)?;
3628 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3629 let locators = idx.lookup_eq(key);
3630 let cold_u64_key = index_key_as_u64(key);
3631 for loc in locators {
3632 match *loc {
3633 RowLocator::Hot(i) => {
3634 if let Some(row) = t.rows.get(i) {
3635 return Some(row.clone());
3636 }
3637 }
3638 RowLocator::Cold {
3639 segment_id,
3640 page_offset: _,
3641 } => {
3642 let Some(u64_key) = cold_u64_key else {
3643 continue;
3646 };
3647 let Some(seg) = self
3648 .cold_segments
3649 .get(segment_id as usize)
3650 .and_then(|s| s.as_deref())
3651 else {
3652 continue;
3663 };
3664 let Some(payload) = seg.lookup(u64_key) else {
3665 continue;
3666 };
3667 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3668 return Some(row);
3669 }
3670 }
3671 }
3672 None
3673 }
3674
3675 pub fn promote_cold_row(
3697 &mut self,
3698 table_name: &str,
3699 index_name: &str,
3700 key: &IndexKey,
3701 ) -> Result<Option<usize>, StorageError> {
3702 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3703 let Some((segment_id, _page_offset)) = cold_loc else {
3704 return Ok(None);
3705 };
3706 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3707 StorageError::Corrupt(
3708 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3709 .into(),
3710 )
3711 })?;
3712 let schema = self
3716 .get(table_name)
3717 .ok_or_else(|| {
3718 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3719 })?
3720 .schema
3721 .clone();
3722 let seg = self
3723 .cold_segments
3724 .get(segment_id as usize)
3725 .and_then(|s| s.as_ref())
3726 .ok_or_else(|| {
3727 StorageError::Corrupt(format!(
3728 "promote_cold_row: segment {segment_id} not registered on catalog"
3729 ))
3730 })?;
3731 let payload = seg.lookup(u64_key).ok_or_else(|| {
3732 StorageError::Corrupt(format!(
3733 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3734 but the segment's bloom/page lookup didn't return a row"
3735 ))
3736 })?;
3737 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3738 let t = self
3743 .get_mut(table_name)
3744 .expect("table existed at lookup time");
3745 t.insert(row)?;
3746 let new_hot_idx =
3747 t.rows.len().checked_sub(1).ok_or_else(|| {
3748 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3749 })?;
3750 t.remove_cold_locators_for_key(index_name, key)?;
3754 Ok(Some(new_hot_idx))
3755 }
3756
3757 pub fn shadow_cold_row(
3775 &mut self,
3776 table_name: &str,
3777 index_name: &str,
3778 key: &IndexKey,
3779 ) -> Result<usize, StorageError> {
3780 let t = self.get_mut(table_name).ok_or_else(|| {
3781 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3782 })?;
3783 t.remove_cold_locators_for_key(index_name, key)
3784 }
3785
3786 pub fn prepare_freeze_slice(
3804 &self,
3805 table_name: &str,
3806 index_name: &str,
3807 row_range: core::ops::Range<usize>,
3808 ) -> Result<FreezeSlice, StorageError> {
3809 let table = self.get(table_name).ok_or_else(|| {
3810 StorageError::Corrupt(format!(
3811 "prepare_freeze_slice: table {table_name:?} not found"
3812 ))
3813 })?;
3814 let idx = table
3815 .indices
3816 .iter()
3817 .find(|i| i.name == index_name)
3818 .ok_or_else(|| {
3819 StorageError::Corrupt(format!(
3820 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3821 ))
3822 })?;
3823 if !matches!(idx.kind, IndexKind::BTree(_)) {
3824 return Err(StorageError::Corrupt(format!(
3825 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3826 )));
3827 }
3828 if row_range.end > table.rows.len() {
3829 return Err(StorageError::Corrupt(format!(
3830 "prepare_freeze_slice: row_range end {} > row_count {}",
3831 row_range.end,
3832 table.rows.len()
3833 )));
3834 }
3835 let column_position = idx.column_position;
3836 let schema = table.schema.clone();
3837 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3838 for row_idx in row_range.clone() {
3839 let row = table.rows.get(row_idx).expect("bounds-checked above");
3840 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3841 StorageError::Corrupt(format!(
3842 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3843 ))
3844 })?;
3845 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3846 StorageError::Corrupt(format!(
3847 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3848 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3849 ))
3850 })?;
3851 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3852 }
3853 rows.sort_by_key(|(k, _, _)| *k);
3854 Ok(FreezeSlice { row_range, rows })
3855 }
3856
3857 pub fn commit_freeze_slices(
3871 &mut self,
3872 table_name: &str,
3873 index_name: &str,
3874 slices: Vec<FreezeSlice>,
3875 ) -> Result<FreezeReport, StorageError> {
3876 let table = self.get(table_name).ok_or_else(|| {
3878 StorageError::Corrupt(format!(
3879 "commit_freeze_slices: table {table_name:?} not found"
3880 ))
3881 })?;
3882 let idx = table
3883 .indices
3884 .iter()
3885 .find(|i| i.name == index_name)
3886 .ok_or_else(|| {
3887 StorageError::Corrupt(format!(
3888 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3889 ))
3890 })?;
3891 if !matches!(idx.kind, IndexKind::BTree(_)) {
3892 return Err(StorageError::Corrupt(format!(
3893 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3894 )));
3895 }
3896 let mut ordered = slices;
3900 ordered.sort_by_key(|s| s.row_range.start);
3901 let mut expected_start = 0usize;
3905 for s in &ordered {
3906 if s.row_range.start != expected_start {
3907 return Err(StorageError::Corrupt(format!(
3908 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3909 s.row_range.start, expected_start
3910 )));
3911 }
3912 expected_start = s.row_range.end;
3913 }
3914 let max_rows = expected_start;
3915 if max_rows > table.rows.len() {
3916 return Err(StorageError::Corrupt(format!(
3917 "commit_freeze_slices: total row range {} exceeds row_count {}",
3918 max_rows,
3919 table.rows.len()
3920 )));
3921 }
3922 if max_rows == 0 {
3923 return Ok(FreezeReport {
3924 segment_id: u32::MAX,
3925 frozen_rows: 0,
3926 bytes_freed: 0,
3927 segment_bytes: Vec::new(),
3928 });
3929 }
3930
3931 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3936 if total_rows != max_rows {
3937 return Err(StorageError::Corrupt(format!(
3938 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3939 )));
3940 }
3941 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3942 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3943 loop {
3944 let mut pick: Option<usize> = None;
3947 for (i, c) in cursors.iter().enumerate() {
3948 let slice = &ordered[i];
3949 if *c >= slice.rows.len() {
3950 continue;
3951 }
3952 match pick {
3953 None => pick = Some(i),
3954 Some(j) => {
3955 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3956 pick = Some(i);
3957 }
3958 }
3959 }
3960 }
3961 let Some(i) = pick else { break };
3962 let row = ordered[i].rows[cursors[i]].clone();
3963 cursors[i] += 1;
3964 merged.push(row);
3965 }
3966 for w in merged.windows(2) {
3969 if w[0].0 == w[1].0 {
3970 return Err(StorageError::Corrupt(format!(
3971 "commit_freeze_slices: duplicate PK {} across slices",
3972 w[0].0
3973 )));
3974 }
3975 }
3976 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3977 let seg_rows: Vec<(u64, Vec<u8>)> =
3978 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
3979 let frozen_rows = seg_rows.len();
3980 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3981 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
3982
3983 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3985 let positions: Vec<usize> = (0..max_rows).collect();
3986 let t_mut = self
3987 .get_mut(table_name)
3988 .expect("just validated; still present");
3989 let removed = t_mut.delete_rows(&positions);
3990 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3991 let bytes_after = t_mut.hot_bytes();
3992 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3993
3994 let segment_id = self
3995 .load_segment_bytes(seg_bytes.clone())
3996 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3997 let new_cold = post_swap_keys.into_iter().map(|k| {
3998 (
3999 k,
4000 RowLocator::Cold {
4001 segment_id,
4002 page_offset: 0,
4003 },
4004 )
4005 });
4006 let t_mut = self.get_mut(table_name).expect("still present");
4007 t_mut.register_cold_locators(index_name, new_cold)?;
4008
4009 Ok(FreezeReport {
4010 segment_id,
4011 frozen_rows,
4012 bytes_freed,
4013 segment_bytes: seg_bytes,
4014 })
4015 }
4016
4017 pub fn compact_cold_segments(
4060 &mut self,
4061 table_name: &str,
4062 index_name: &str,
4063 target_segment_bytes: u64,
4064 ) -> Result<CompactReport, StorageError> {
4065 let t = self.get(table_name).ok_or_else(|| {
4067 StorageError::Corrupt(format!(
4068 "compact_cold_segments: table {table_name:?} not found"
4069 ))
4070 })?;
4071 let idx = t
4072 .indices
4073 .iter()
4074 .find(|i| i.name == index_name)
4075 .ok_or_else(|| {
4076 StorageError::Corrupt(format!(
4077 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4078 ))
4079 })?;
4080 let map = match &idx.kind {
4081 IndexKind::BTree(m) => m,
4082 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
4083 return Err(StorageError::Corrupt(format!(
4084 "compact_cold_segments: index {index_name:?} is not BTree; \
4085 compaction applies only to BTree cold-tier indices"
4086 )));
4087 }
4088 };
4089
4090 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4093 for (_key, locators) in map.iter() {
4094 for loc in locators {
4095 if let RowLocator::Cold { segment_id, .. } = loc {
4096 referenced_ids.insert(*segment_id);
4097 }
4098 }
4099 }
4100 let candidate_set: BTreeSet<u32> = referenced_ids
4102 .into_iter()
4103 .filter(|id| {
4104 self.cold_segments
4105 .get(*id as usize)
4106 .and_then(|s| s.as_deref())
4107 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4108 })
4109 .collect();
4110 if candidate_set.len() < 2 {
4111 return Ok(CompactReport {
4112 sources: Vec::new(),
4113 merged_segment_id: None,
4114 merged_segment_bytes: Vec::new(),
4115 merged_rows: 0,
4116 deleted_rows_pruned: 0,
4117 bytes_reclaimed_estimate: 0,
4118 });
4119 }
4120 let mut source_row_count: usize = 0;
4122 let mut source_byte_total: u64 = 0;
4123 for &id in &candidate_set {
4124 let seg = self.cold_segments[id as usize]
4125 .as_ref()
4126 .expect("candidate selected only when slot is Some");
4127 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4128 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4129 }
4130 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4136 for (key, locators) in map.iter() {
4137 for loc in locators {
4138 let RowLocator::Cold { segment_id, .. } = loc else {
4139 continue;
4140 };
4141 if !candidate_set.contains(segment_id) {
4142 continue;
4143 }
4144 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4145 StorageError::Corrupt(format!(
4146 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4147 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4148 ))
4149 })?;
4150 let seg = self.cold_segments[*segment_id as usize]
4151 .as_ref()
4152 .expect("candidate slot guaranteed Some above");
4153 let payload = seg.lookup(u64_key).ok_or_else(|| {
4154 StorageError::Corrupt(format!(
4155 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4156 at segment {segment_id} but the segment lookup missed"
4157 ))
4158 })?;
4159 collected.insert(u64_key, (payload, key.clone()));
4160 break;
4161 }
4162 }
4163 let merged_rows = collected.len();
4164 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4165
4166 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4170 .iter()
4171 .map(|(k, (body, _))| (*k, body.clone()))
4172 .collect();
4173 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4174 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4175 let merged_bytes_len = seg_bytes.len() as u64;
4176
4177 let merged_segment_id = self
4179 .load_segment_bytes(seg_bytes.clone())
4180 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4181
4182 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4188 let t = self
4189 .get(table_name)
4190 .expect("table existed at the start of this fn");
4191 let idx = t
4192 .indices
4193 .iter()
4194 .find(|i| i.name == index_name)
4195 .expect("index existed at the start of this fn");
4196 let IndexKind::BTree(map) = &idx.kind else {
4197 unreachable!("validated above");
4198 };
4199 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4200 };
4201 let t_mut = self
4202 .get_mut(table_name)
4203 .expect("table existed at the start of this fn");
4204 let idx_mut = t_mut
4205 .indices
4206 .iter_mut()
4207 .find(|i| i.name == index_name)
4208 .expect("index existed at the start of this fn");
4209 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4210 unreachable!("validated above");
4211 };
4212 for (key, locators) in entries {
4213 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4214 let mut changed = false;
4215 for loc in &locators {
4216 match *loc {
4217 RowLocator::Cold {
4218 segment_id,
4219 page_offset: _,
4220 } if candidate_set.contains(&segment_id) => {
4221 let replacement = RowLocator::Cold {
4222 segment_id: merged_segment_id,
4223 page_offset: 0,
4224 };
4225 if !new_locs.contains(&replacement) {
4226 new_locs.push(replacement);
4227 }
4228 changed = true;
4229 }
4230 other => new_locs.push(other),
4231 }
4232 }
4233 if changed {
4234 map_mut.insert_mut(key, new_locs);
4235 }
4236 }
4237
4238 for &id in &candidate_set {
4243 self.tombstone_segment(id)?;
4244 }
4245
4246 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4247 Ok(CompactReport {
4248 sources: candidate_set.into_iter().collect(),
4249 merged_segment_id: Some(merged_segment_id),
4250 merged_segment_bytes: seg_bytes,
4251 merged_rows,
4252 deleted_rows_pruned,
4253 bytes_reclaimed_estimate,
4254 })
4255 }
4256
4257 fn find_cold_locator(
4263 &self,
4264 table_name: &str,
4265 index_name: &str,
4266 key: &IndexKey,
4267 ) -> Result<Option<(u32, u32)>, StorageError> {
4268 let t = self.get(table_name).ok_or_else(|| {
4269 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4270 })?;
4271 let idx = t
4272 .indices
4273 .iter()
4274 .find(|i| i.name == index_name)
4275 .ok_or_else(|| {
4276 StorageError::Corrupt(format!(
4277 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4278 ))
4279 })?;
4280 if !matches!(idx.kind, IndexKind::BTree(_)) {
4281 return Err(StorageError::Corrupt(format!(
4282 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4283 )));
4284 }
4285 for loc in idx.lookup_eq(key) {
4286 if let RowLocator::Cold {
4287 segment_id,
4288 page_offset,
4289 } = *loc
4290 {
4291 return Ok(Some((segment_id, page_offset)));
4292 }
4293 }
4294 Ok(None)
4295 }
4296}
4297
4298fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4304 match key {
4305 IndexKey::Int(n) => Some(n.cast_unsigned()),
4311 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4312 }
4313}
4314
4315#[derive(Debug, Clone, PartialEq, Eq)]
4316#[non_exhaustive]
4317pub enum StorageError {
4318 DuplicateTable {
4319 name: String,
4320 },
4321 TableNotFound {
4322 name: String,
4323 },
4324 ArityMismatch {
4325 expected: usize,
4326 actual: usize,
4327 },
4328 TypeMismatch {
4329 column: String,
4330 expected: DataType,
4331 actual: DataType,
4332 position: usize,
4333 },
4334 NullInNotNull {
4335 column: String,
4336 },
4337 DuplicateIndex {
4339 name: String,
4340 },
4341 ColumnNotFound {
4343 column: String,
4344 },
4345 Corrupt(String),
4348 IndexNotFound {
4351 name: String,
4352 },
4353 Unsupported(String),
4357}
4358
4359impl fmt::Display for StorageError {
4360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4361 match self {
4362 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4363 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4364 Self::ArityMismatch { expected, actual } => write!(
4365 f,
4366 "row arity mismatch: expected {expected} columns, got {actual}"
4367 ),
4368 Self::TypeMismatch {
4369 column,
4370 expected,
4371 actual,
4372 position,
4373 } => write!(
4374 f,
4375 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4376 ),
4377 Self::NullInNotNull { column } => {
4378 write!(f, "NULL value in NOT NULL column {column:?}")
4379 }
4380 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4381 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4382 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4383 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4384 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4385 }
4386 }
4387}
4388
4389impl ColumnSchema {
4390 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4391 Self {
4392 name: name.into(),
4393 ty,
4394 nullable,
4395 default: None,
4396 runtime_default: None,
4397 auto_increment: false,
4398 }
4399 }
4400
4401 #[must_use]
4405 pub fn with_default(mut self, default: Value) -> Self {
4406 self.default = Some(default);
4407 self
4408 }
4409
4410 #[must_use]
4415 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4416 self.runtime_default = Some(expr.into());
4417 self
4418 }
4419
4420 #[must_use]
4422 pub const fn with_auto_increment(mut self) -> Self {
4423 self.auto_increment = true;
4424 self
4425 }
4426}
4427
4428impl TableSchema {
4429 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4430 Self {
4431 name: name.into(),
4432 columns,
4433 hot_tier_bytes: None,
4434 foreign_keys: Vec::new(),
4435 uniqueness_constraints: Vec::new(),
4436 checks: Vec::new(),
4437 }
4438 }
4439}
4440
4441const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4489const FILE_VERSION: u8 = 23;
4526const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4529
4530const INDEX_KEY_TAG_INT: u8 = 0;
4535const INDEX_KEY_TAG_TEXT: u8 = 1;
4536const INDEX_KEY_TAG_BOOL: u8 = 2;
4537
4538impl Catalog {
4539 pub fn serialize(&self) -> Vec<u8> {
4542 let mut out = Vec::with_capacity(64);
4543 out.extend_from_slice(FILE_MAGIC);
4544 out.push(FILE_VERSION);
4545 write_u32(
4546 &mut out,
4547 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4548 );
4549 for t in &self.tables {
4550 write_str(&mut out, &t.schema.name);
4551 write_u16(
4552 &mut out,
4553 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4554 );
4555 for c in &t.schema.columns {
4556 write_str(&mut out, &c.name);
4557 write_data_type(&mut out, c.ty);
4558 out.push(u8::from(c.nullable));
4559 match &c.default {
4560 None => out.push(0),
4561 Some(v) => {
4562 out.push(1);
4563 write_value(&mut out, v);
4564 }
4565 }
4566 out.push(u8::from(c.auto_increment));
4567 }
4568 write_u32(
4569 &mut out,
4570 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4571 );
4572 for row in &t.rows {
4577 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4578 }
4579 write_u16(
4586 &mut out,
4587 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4588 );
4589 for idx in &t.indices {
4590 write_str(&mut out, &idx.name);
4591 write_u16(
4592 &mut out,
4593 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4594 );
4595 match &idx.kind {
4596 IndexKind::BTree(map) => {
4597 out.push(0);
4598 write_u32(
4606 &mut out,
4607 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4608 );
4609 for (key, locators) in map {
4610 write_index_key(&mut out, key);
4611 write_u32(
4612 &mut out,
4613 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4614 );
4615 for loc in locators {
4616 loc.write_le(&mut out);
4617 }
4618 }
4619 }
4620 IndexKind::Nsw(g) => {
4621 out.push(1);
4622 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4623 write_nsw_graph(&mut out, g);
4624 }
4625 IndexKind::Brin { column_type } => {
4626 out.push(2);
4632 write_data_type(&mut out, *column_type);
4633 }
4634 IndexKind::Gin(map) => {
4635 out.push(3);
4644 write_u32(
4645 &mut out,
4646 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4647 );
4648 for (word, locators) in map {
4649 write_str(&mut out, word);
4650 write_u32(
4651 &mut out,
4652 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4653 );
4654 for loc in locators {
4655 loc.write_le(&mut out);
4656 }
4657 }
4658 }
4659 }
4660 write_u16(
4666 &mut out,
4667 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4668 );
4669 for col_pos in &idx.included_columns {
4670 write_u16(
4671 &mut out,
4672 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4673 );
4674 }
4675 match &idx.partial_predicate {
4679 None => out.push(0),
4680 Some(pred) => {
4681 out.push(1);
4682 write_str(&mut out, pred);
4683 }
4684 }
4685 match &idx.expression {
4688 None => out.push(0),
4689 Some(expr) => {
4690 out.push(1);
4691 write_str(&mut out, expr);
4692 }
4693 }
4694 out.push(u8::from(idx.is_unique));
4698 write_u16(
4701 &mut out,
4702 u16::try_from(idx.extra_column_positions.len())
4703 .expect("≤ 65k extra cols / index"),
4704 );
4705 for cp in &idx.extra_column_positions {
4706 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4707 }
4708 }
4709 match t.schema.hot_tier_bytes {
4715 None => out.push(0),
4716 Some(n) => {
4717 out.push(1);
4718 out.extend_from_slice(&n.to_le_bytes());
4719 }
4720 }
4721 write_u16(
4732 &mut out,
4733 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4734 );
4735 for fk in &t.schema.foreign_keys {
4736 match &fk.name {
4737 None => out.push(0),
4738 Some(n) => {
4739 out.push(1);
4740 write_str(&mut out, n);
4741 }
4742 }
4743 write_u16(
4744 &mut out,
4745 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4746 );
4747 for &p in &fk.local_columns {
4748 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4749 }
4750 write_str(&mut out, &fk.parent_table);
4751 write_u16(
4752 &mut out,
4753 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4754 );
4755 for &p in &fk.parent_columns {
4756 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4757 }
4758 out.push(fk.on_delete.tag());
4759 out.push(fk.on_update.tag());
4760 }
4761 write_u16(
4770 &mut out,
4771 u16::try_from(t.schema.uniqueness_constraints.len())
4772 .expect("≤ 65k uniqueness constraints/table"),
4773 );
4774 for uc in &t.schema.uniqueness_constraints {
4775 out.push(u8::from(uc.is_primary_key));
4776 write_u16(
4777 &mut out,
4778 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4779 );
4780 for &p in &uc.columns {
4781 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4782 }
4783 out.push(u8::from(uc.nulls_not_distinct));
4788 }
4789 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4796 for (i, c) in t.schema.columns.iter().enumerate() {
4797 if let Some(e) = &c.runtime_default {
4798 rt_defaults.push((i, e.as_str()));
4799 }
4800 }
4801 write_u16(
4802 &mut out,
4803 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4804 );
4805 for (pos, expr) in rt_defaults {
4806 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4807 write_str(&mut out, expr);
4808 }
4809 write_u16(
4816 &mut out,
4817 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
4818 );
4819 for c in &t.schema.checks {
4820 write_str(&mut out, c.as_str());
4821 }
4822 }
4823 write_u32(
4836 &mut out,
4837 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
4838 );
4839 for fd in self.functions.values() {
4840 write_str(&mut out, &fd.name);
4841 write_str(&mut out, &fd.args_repr);
4842 write_str(&mut out, &fd.returns);
4843 write_str(&mut out, &fd.language);
4844 write_str_long(&mut out, &fd.body);
4845 }
4846 write_u32(
4847 &mut out,
4848 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
4849 );
4850 for td in &self.triggers {
4851 write_str(&mut out, &td.name);
4852 write_str(&mut out, &td.table);
4853 write_str(&mut out, &td.timing);
4854 write_u16(
4855 &mut out,
4856 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
4857 );
4858 for ev in &td.events {
4859 write_str(&mut out, ev);
4860 }
4861 write_str(&mut out, &td.for_each);
4862 write_str(&mut out, &td.function);
4863 write_u16(
4867 &mut out,
4868 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
4869 );
4870 for c in &td.update_columns {
4871 write_str(&mut out, c);
4872 }
4873 }
4874 out
4875 }
4876
4877 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4880 let mut cur = Cursor::new(buf);
4881 let magic = cur.take(8)?;
4882 if magic != FILE_MAGIC {
4883 return Err(StorageError::Corrupt(format!(
4884 "bad magic: expected SPGDB001, got {magic:?}"
4885 )));
4886 }
4887 let version = cur.read_u8()?;
4888 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4889 return Err(StorageError::Corrupt(format!(
4890 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4891 )));
4892 }
4893 let table_count = cur.read_u32()? as usize;
4894 let mut cat = Self::new();
4895 for _ in 0..table_count {
4896 deserialize_table(&mut cur, &mut cat, version)?;
4897 }
4898 if version >= 22 {
4902 let fn_count = cur.read_u32()? as usize;
4903 for _ in 0..fn_count {
4904 let name = cur.read_str()?;
4905 let args_repr = cur.read_str()?;
4906 let returns = cur.read_str()?;
4907 let language = cur.read_str()?;
4908 let body = cur.read_str_long()?;
4909 cat.functions.insert(
4910 name.clone(),
4911 FunctionDef {
4912 name,
4913 args_repr,
4914 returns,
4915 language,
4916 body,
4917 },
4918 );
4919 }
4920 let trg_count = cur.read_u32()? as usize;
4921 for _ in 0..trg_count {
4922 let name = cur.read_str()?;
4923 let table = cur.read_str()?;
4924 let timing = cur.read_str()?;
4925 let ev_count = cur.read_u16()? as usize;
4926 let mut events = Vec::with_capacity(ev_count);
4927 for _ in 0..ev_count {
4928 events.push(cur.read_str()?);
4929 }
4930 let for_each = cur.read_str()?;
4931 let function = cur.read_str()?;
4932 let update_columns = if version >= 23 {
4936 let n = cur.read_u16()? as usize;
4937 let mut cols = Vec::with_capacity(n);
4938 for _ in 0..n {
4939 cols.push(cur.read_str()?);
4940 }
4941 cols
4942 } else {
4943 Vec::new()
4944 };
4945 cat.triggers.push(TriggerDef {
4946 name,
4947 table,
4948 timing,
4949 events,
4950 for_each,
4951 function,
4952 update_columns,
4953 });
4954 }
4955 }
4956 if cur.pos < buf.len() {
4957 return Err(StorageError::Corrupt(format!(
4958 "trailing bytes: {} unread",
4959 buf.len() - cur.pos
4960 )));
4961 }
4962 Ok(cat)
4963 }
4964}
4965
4966fn deserialize_table(
4971 cur: &mut Cursor<'_>,
4972 cat: &mut Catalog,
4973 version: u8,
4974) -> Result<(), StorageError> {
4975 let table_name = cur.read_str()?;
4976 let name = table_name.clone();
4977 let col_count = cur.read_u16()? as usize;
4978 let mut cols = Vec::with_capacity(col_count);
4979 for _ in 0..col_count {
4980 let c_name = cur.read_str()?;
4981 let ty = cur.read_data_type()?;
4982 let nullable = cur.read_u8()? != 0;
4983 let default = match cur.read_u8()? {
4984 0 => None,
4985 1 => Some(cur.read_value()?),
4986 other => {
4987 return Err(StorageError::Corrupt(format!(
4988 "unknown default tag: {other}"
4989 )));
4990 }
4991 };
4992 let auto_increment = cur.read_u8()? != 0;
4993 cols.push(ColumnSchema {
4997 name: c_name,
4998 ty,
4999 nullable,
5000 default,
5001 runtime_default: None,
5002 auto_increment,
5003 });
5004 }
5005 let n_cols = cols.len();
5006 cat.create_table(TableSchema::new(name, cols))?;
5007 let t = cat.tables.last_mut().expect("create_table just pushed");
5011 deserialize_rows(cur, t, n_cols)?;
5012 deserialize_indices(cur, t, version)?;
5013 if version >= 11 {
5019 let has = cur.read_u8()?;
5020 let hot_tier_bytes = match has {
5021 0 => None,
5022 1 => Some(cur.read_u64()?),
5023 other => {
5024 return Err(StorageError::Corrupt(format!(
5025 "hot_tier_bytes appendix: unknown has-value byte {other}"
5026 )));
5027 }
5028 };
5029 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
5030 }
5031 if version >= 13 {
5034 let fk_count = cur.read_u16()? as usize;
5035 let mut fks = Vec::with_capacity(fk_count);
5036 for _ in 0..fk_count {
5037 let name = match cur.read_u8()? {
5038 0 => None,
5039 1 => Some(cur.read_str()?),
5040 other => {
5041 return Err(StorageError::Corrupt(format!(
5042 "FK appendix: unknown has-name byte {other}"
5043 )));
5044 }
5045 };
5046 let local_arity = cur.read_u16()? as usize;
5047 let mut local_columns = Vec::with_capacity(local_arity);
5048 for _ in 0..local_arity {
5049 local_columns.push(cur.read_u16()? as usize);
5050 }
5051 let parent_table = cur.read_str()?;
5052 let parent_arity = cur.read_u16()? as usize;
5053 if parent_arity != local_arity {
5054 return Err(StorageError::Corrupt(format!(
5055 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
5056 )));
5057 }
5058 let mut parent_columns = Vec::with_capacity(parent_arity);
5059 for _ in 0..parent_arity {
5060 parent_columns.push(cur.read_u16()? as usize);
5061 }
5062 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5063 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
5064 })?;
5065 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5066 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
5067 })?;
5068 fks.push(ForeignKeyConstraint {
5069 name,
5070 local_columns,
5071 parent_table,
5072 parent_columns,
5073 on_delete,
5074 on_update,
5075 });
5076 }
5077 t.schema_mut().foreign_keys = fks;
5078 }
5079 if version >= 15 {
5082 let uc_count = cur.read_u16()? as usize;
5083 let mut ucs = Vec::with_capacity(uc_count);
5084 for _ in 0..uc_count {
5085 let is_pk = cur.read_u8()? != 0;
5086 let arity = cur.read_u16()? as usize;
5087 let mut cols = Vec::with_capacity(arity);
5088 for _ in 0..arity {
5089 cols.push(cur.read_u16()? as usize);
5090 }
5091 let nulls_not_distinct = if version >= 23 {
5095 cur.read_u8()? != 0
5096 } else {
5097 false
5098 };
5099 ucs.push(UniquenessConstraint {
5100 is_primary_key: is_pk,
5101 columns: cols,
5102 nulls_not_distinct,
5103 });
5104 }
5105 t.schema_mut().uniqueness_constraints = ucs;
5106 let rt_count = cur.read_u16()? as usize;
5108 for _ in 0..rt_count {
5109 let pos = cur.read_u16()? as usize;
5110 let expr = cur.read_str()?;
5111 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5112 col.runtime_default = Some(expr);
5113 }
5114 }
5115 }
5116 if version >= 23 {
5119 let check_count = cur.read_u16()? as usize;
5120 let mut checks = Vec::with_capacity(check_count);
5121 for _ in 0..check_count {
5122 checks.push(cur.read_str()?);
5123 }
5124 t.schema_mut().checks = checks;
5125 }
5126 let _ = table_name;
5127 Ok(())
5128}
5129
5130fn deserialize_rows(
5131 cur: &mut Cursor<'_>,
5132 t: &mut Table,
5133 _n_cols: usize,
5134) -> Result<(), StorageError> {
5135 let row_count = cur.read_u32()? as usize;
5136 let mut hot_bytes: u64 = 0;
5141 for _ in 0..row_count {
5142 let tail = &cur.buf[cur.pos..];
5143 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5144 cur.pos += consumed;
5145 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5151 t.rows.push_mut(row);
5152 }
5153 t.hot_bytes = hot_bytes;
5154 Ok(())
5155}
5156
5157fn deserialize_indices(
5158 cur: &mut Cursor<'_>,
5159 t: &mut Table,
5160 version: u8,
5161) -> Result<(), StorageError> {
5162 let index_count = cur.read_u16()? as usize;
5163 for _ in 0..index_count {
5164 let idx_name = cur.read_str()?;
5165 let col_pos = cur.read_u16()? as usize;
5166 let column_name = t
5167 .schema
5168 .columns
5169 .get(col_pos)
5170 .ok_or_else(|| {
5171 StorageError::Corrupt(format!(
5172 "index {idx_name:?} points at non-existent column position {col_pos}"
5173 ))
5174 })?
5175 .name
5176 .clone();
5177 let kind_tag = cur.read_u8()?;
5178 match kind_tag {
5179 0 => {
5180 if version >= 9 {
5181 let map = read_btree_map(cur)?;
5186 t.restore_btree_index(idx_name, &column_name, map)?;
5187 } else {
5188 t.add_index(idx_name, &column_name)?;
5193 }
5194 }
5195 1 => {
5196 let m = cur.read_u16()? as usize;
5197 let graph = cur.read_nsw_graph(m)?;
5198 t.restore_nsw_index(idx_name, &column_name, graph)?;
5199 }
5200 2 => {
5201 let column_type = cur.read_data_type()?;
5205 t.restore_brin_index(idx_name, &column_name, column_type)?;
5206 }
5207 3 => {
5208 let map = read_gin_map(cur)?;
5213 t.restore_gin_index(idx_name, &column_name, map)?;
5214 }
5215 other => {
5216 return Err(StorageError::Corrupt(format!(
5217 "unknown index kind tag: {other}"
5218 )));
5219 }
5220 }
5221 if version >= 12 {
5224 let num_included = cur.read_u16()? as usize;
5225 if num_included > 0 {
5226 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5227 for _ in 0..num_included {
5228 let cp = cur.read_u16()? as usize;
5229 if cp >= t.schema.columns.len() {
5230 return Err(StorageError::Corrupt(format!(
5231 "INCLUDE column position {cp} out of range \
5232 ({} schema columns)",
5233 t.schema.columns.len()
5234 )));
5235 }
5236 included.push(cp);
5237 }
5238 if let Some(last) = t.indices.last_mut() {
5239 last.included_columns = included;
5240 }
5241 }
5242 match cur.read_u8()? {
5244 0 => {}
5245 1 => {
5246 let pred = cur.read_str()?;
5247 if let Some(last) = t.indices.last_mut() {
5248 last.partial_predicate = Some(pred);
5249 }
5250 }
5251 other => {
5252 return Err(StorageError::Corrupt(format!(
5253 "partial_predicate tag: unknown byte {other}"
5254 )));
5255 }
5256 }
5257 match cur.read_u8()? {
5259 0 => {}
5260 1 => {
5261 let expr = cur.read_str()?;
5262 if let Some(last) = t.indices.last_mut() {
5263 last.expression = Some(expr);
5264 }
5265 }
5266 other => {
5267 return Err(StorageError::Corrupt(format!(
5268 "expression tag: unknown byte {other}"
5269 )));
5270 }
5271 }
5272 if version >= 16 {
5275 match cur.read_u8()? {
5276 0 => {}
5277 1 => {
5278 if let Some(last) = t.indices.last_mut() {
5279 last.is_unique = true;
5280 }
5281 }
5282 other => {
5283 return Err(StorageError::Corrupt(format!(
5284 "is_unique tag: unknown byte {other}"
5285 )));
5286 }
5287 }
5288 let n = cur.read_u16()? as usize;
5290 if n > 0 {
5291 let mut extras: Vec<usize> = Vec::with_capacity(n);
5292 for _ in 0..n {
5293 let cp = cur.read_u16()? as usize;
5294 if cp >= t.schema.columns.len() {
5295 return Err(StorageError::Corrupt(format!(
5296 "extra column position {cp} out of range \
5297 ({} schema columns)",
5298 t.schema.columns.len()
5299 )));
5300 }
5301 extras.push(cp);
5302 }
5303 if let Some(last) = t.indices.last_mut() {
5304 last.extra_column_positions = extras;
5305 }
5306 }
5307 }
5308 }
5309 }
5310 Ok(())
5311}
5312
5313fn read_btree_map(
5317 cur: &mut Cursor<'_>,
5318) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5319 let entry_count = cur.read_u32()? as usize;
5320 let mut map = PersistentBTreeMap::new();
5321 for _ in 0..entry_count {
5322 let key = cur.read_index_key()?;
5323 let locator_count = cur.read_u32()? as usize;
5324 let mut locators = Vec::with_capacity(locator_count);
5325 for _ in 0..locator_count {
5326 let tail = &cur.buf[cur.pos..];
5327 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5328 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5329 })?;
5330 cur.pos += consumed;
5331 locators.push(loc);
5332 }
5333 map.insert_mut(key, locators);
5334 }
5335 Ok(map)
5336}
5337
5338fn read_gin_map(
5342 cur: &mut Cursor<'_>,
5343) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5344 let entry_count = cur.read_u32()? as usize;
5345 let mut map = PersistentBTreeMap::new();
5346 for _ in 0..entry_count {
5347 let word = cur.read_str()?;
5348 let locator_count = cur.read_u32()? as usize;
5349 let mut locators = Vec::with_capacity(locator_count);
5350 for _ in 0..locator_count {
5351 let tail = &cur.buf[cur.pos..];
5352 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5353 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5354 })?;
5355 cur.pos += consumed;
5356 locators.push(loc);
5357 }
5358 map.insert_mut(word, locators);
5359 }
5360 Ok(map)
5361}
5362
5363fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5379 let entry = g.entry.map_or(u32::MAX, |e| {
5380 u32::try_from(e).expect("NSW entry fits in u32")
5381 });
5382 write_u16(
5383 out,
5384 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5385 );
5386 out.extend_from_slice(&entry.to_le_bytes());
5387 out.push(g.entry_level);
5388 let node_count = g.levels.len();
5389 write_u32(
5390 out,
5391 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5392 );
5393 for &lvl in &g.levels {
5394 out.push(lvl);
5395 }
5396 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5397 out.push(layer_count);
5398 for layer in &g.layers {
5399 write_u32(
5400 out,
5401 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5402 );
5403 for neighbors in layer {
5404 write_u16(
5405 out,
5406 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5407 );
5408 for &peer in neighbors {
5412 write_u32(out, peer);
5413 }
5414 }
5415 }
5416}
5417
5418fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5419 match t {
5420 DataType::Int => out.push(1),
5421 DataType::BigInt => out.push(2),
5422 DataType::Float => out.push(3),
5423 DataType::Text => out.push(4),
5424 DataType::Bool => out.push(5),
5425 DataType::Vector { dim, encoding } => match encoding {
5426 VecEncoding::F32 => {
5430 out.push(6);
5431 out.extend_from_slice(&dim.to_le_bytes());
5432 }
5433 VecEncoding::F16 => {
5436 out.push(15);
5437 out.extend_from_slice(&dim.to_le_bytes());
5438 }
5439 VecEncoding::Sq8 => {
5445 out.push(14);
5446 out.extend_from_slice(&dim.to_le_bytes());
5447 }
5448 },
5449 DataType::SmallInt => out.push(7),
5450 DataType::Varchar(max) => {
5451 out.push(8);
5452 out.extend_from_slice(&max.to_le_bytes());
5453 }
5454 DataType::Char(size) => {
5455 out.push(9);
5456 out.extend_from_slice(&size.to_le_bytes());
5457 }
5458 DataType::Numeric { precision, scale } => {
5459 out.push(10);
5460 out.push(precision);
5461 out.push(scale);
5462 }
5463 DataType::Date => out.push(11),
5464 DataType::Timestamp => out.push(12),
5465 DataType::Timestamptz => out.push(17),
5469 DataType::Interval => {
5474 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5475 }
5476 DataType::Json => out.push(13),
5477 DataType::Jsonb => out.push(16),
5480 DataType::Bytes => out.push(18),
5482 DataType::TextArray => out.push(19),
5485 DataType::IntArray => out.push(20),
5488 DataType::BigIntArray => out.push(21),
5491 DataType::TsVector => out.push(22),
5494 DataType::TsQuery => out.push(23),
5497 }
5498}
5499
5500impl Cursor<'_> {
5501 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5502 let tag = self.read_u8()?;
5503 match tag {
5504 1 => Ok(DataType::Int),
5505 2 => Ok(DataType::BigInt),
5506 3 => Ok(DataType::Float),
5507 4 => Ok(DataType::Text),
5508 5 => Ok(DataType::Bool),
5509 6 => Ok(DataType::Vector {
5510 dim: self.read_u32()?,
5511 encoding: VecEncoding::F32,
5512 }),
5513 7 => Ok(DataType::SmallInt),
5514 8 => Ok(DataType::Varchar(self.read_u32()?)),
5515 9 => Ok(DataType::Char(self.read_u32()?)),
5516 10 => {
5517 let precision = self.read_u8()?;
5518 let scale = self.read_u8()?;
5519 Ok(DataType::Numeric { precision, scale })
5520 }
5521 11 => Ok(DataType::Date),
5522 12 => Ok(DataType::Timestamp),
5523 13 => Ok(DataType::Json),
5524 14 => Ok(DataType::Vector {
5525 dim: self.read_u32()?,
5526 encoding: VecEncoding::Sq8,
5527 }),
5528 15 => Ok(DataType::Vector {
5532 dim: self.read_u32()?,
5533 encoding: VecEncoding::F16,
5534 }),
5535 16 => Ok(DataType::Jsonb),
5539 17 => Ok(DataType::Timestamptz),
5543 18 => Ok(DataType::Bytes),
5545 19 => Ok(DataType::TextArray),
5547 20 => Ok(DataType::IntArray),
5549 21 => Ok(DataType::BigIntArray),
5550 22 => Ok(DataType::TsVector),
5553 23 => Ok(DataType::TsQuery),
5554 other => Err(StorageError::Corrupt(format!(
5555 "unknown data type tag: {other}"
5556 ))),
5557 }
5558 }
5559}
5560
5561pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5567 debug_assert_eq!(
5568 row.values.len(),
5569 schema.columns.len(),
5570 "row_body_encoded_len: row arity must match schema"
5571 );
5572 let bitmap_bytes = schema.columns.len().div_ceil(8);
5573 let mut n = bitmap_bytes;
5574 for (col_idx, v) in row.values.iter().enumerate() {
5575 if matches!(v, Value::Null) {
5576 continue;
5577 }
5578 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5579 }
5580 n
5581}
5582
5583fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5589 match v {
5590 Value::SmallInt(_) => 2,
5591 Value::Int(_) | Value::Date(_) => 4,
5593 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5595 Value::Bool(_) => 1,
5596 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5598 Value::Vector(vec) => 4 + 4 * vec.len(),
5600 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5607 Value::HalfVector(h) => 4 + h.bytes.len(),
5610 Value::Numeric { .. } => 16 + 1,
5612 Value::Bytes(b) => 2 + b.len(),
5618 Value::TextArray(items) => {
5621 let mut n = 2; for item in items {
5623 n += 1; if let Some(s) = item {
5625 n += 2 + s.len();
5626 }
5627 }
5628 n
5629 }
5630 Value::IntArray(items) => {
5633 2 + items
5634 .iter()
5635 .map(|x| if x.is_some() { 5 } else { 1 })
5636 .sum::<usize>()
5637 }
5638 Value::BigIntArray(items) => {
5639 2 + items
5640 .iter()
5641 .map(|x| if x.is_some() { 9 } else { 1 })
5642 .sum::<usize>()
5643 }
5644 Value::TsVector(lexs) => {
5648 let mut n = 2;
5649 for l in lexs {
5650 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5651 }
5652 n
5653 }
5654 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5657 Value::Null => 0,
5659 Value::Interval { .. } => {
5661 unreachable!("Value::Interval has no on-disk encoding")
5662 }
5663 }
5664}
5665
5666pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5677 debug_assert_eq!(
5678 row.values.len(),
5679 schema.columns.len(),
5680 "dense encode: row arity must match schema"
5681 );
5682 let bitmap_bytes = schema.columns.len().div_ceil(8);
5683 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5686 let bitmap_offset = out.len();
5687 out.resize(bitmap_offset + bitmap_bytes, 0);
5688 for (i, v) in row.values.iter().enumerate() {
5689 if matches!(v, Value::Null) {
5690 out[bitmap_offset + i / 8] |= 1 << (i % 8);
5691 }
5692 }
5693 for (col_idx, v) in row.values.iter().enumerate() {
5694 if matches!(v, Value::Null) {
5695 continue;
5696 }
5697 write_value_body(&mut out, v, schema.columns[col_idx].ty);
5698 }
5699 out
5700}
5701
5702pub fn decode_row_body_dense(
5708 bytes: &[u8],
5709 schema: &TableSchema,
5710) -> Result<(Row, usize), StorageError> {
5711 let mut cur = Cursor::new(bytes);
5712 let bitmap_bytes = schema.columns.len().div_ceil(8);
5713 let mut bitmap_buf = [0u8; 32];
5714 if bitmap_bytes > bitmap_buf.len() {
5715 return Err(StorageError::Corrupt(format!(
5716 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
5717 )));
5718 }
5719 let slice = cur.take(bitmap_bytes)?;
5720 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
5721 let mut values = Vec::with_capacity(schema.columns.len());
5722 for (col_idx, col) in schema.columns.iter().enumerate() {
5723 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
5724 values.push(Value::Null);
5725 } else {
5726 values.push(cur.read_value_body(col.ty)?);
5727 }
5728 }
5729 Ok((Row { values }, cur.pos))
5730}
5731
5732fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
5741 match (v, ty) {
5742 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
5743 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
5744 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
5745 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
5746 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
5747 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
5748 write_str(out, s);
5749 }
5750 (
5751 Value::Vector(v),
5752 DataType::Vector {
5753 encoding: VecEncoding::F32,
5754 ..
5755 },
5756 ) => {
5757 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5758 out.extend_from_slice(&dim.to_le_bytes());
5759 for x in v {
5760 out.extend_from_slice(&x.to_le_bytes());
5761 }
5762 }
5763 (
5769 Value::Sq8Vector(q),
5770 DataType::Vector {
5771 encoding: VecEncoding::Sq8,
5772 ..
5773 },
5774 ) => {
5775 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5776 out.extend_from_slice(&dim.to_le_bytes());
5777 out.extend_from_slice(&q.min.to_le_bytes());
5778 out.extend_from_slice(&q.max.to_le_bytes());
5779 out.extend_from_slice(&q.bytes);
5780 }
5781 (
5785 Value::HalfVector(h),
5786 DataType::Vector {
5787 encoding: VecEncoding::F16,
5788 ..
5789 },
5790 ) => {
5791 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5792 out.extend_from_slice(&dim.to_le_bytes());
5793 out.extend_from_slice(&h.bytes);
5794 }
5795 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
5796 out.extend_from_slice(&scaled.to_le_bytes());
5797 out.push(scale);
5798 }
5799 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
5800 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
5801 out.extend_from_slice(&t.to_le_bytes())
5802 }
5803 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5807 (Value::Bytes(b), DataType::Bytes) => {
5810 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5811 out.extend_from_slice(&len.to_le_bytes());
5812 out.extend_from_slice(b);
5813 }
5814 (Value::TextArray(items), DataType::TextArray) => {
5817 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5818 out.extend_from_slice(&count.to_le_bytes());
5819 for item in items {
5820 match item {
5821 None => out.push(1),
5822 Some(s) => {
5823 out.push(0);
5824 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5825 out.extend_from_slice(&len.to_le_bytes());
5826 out.extend_from_slice(s.as_bytes());
5827 }
5828 }
5829 }
5830 }
5831 (Value::IntArray(items), DataType::IntArray) => {
5834 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5835 out.extend_from_slice(&count.to_le_bytes());
5836 for item in items {
5837 match item {
5838 None => out.push(1),
5839 Some(n) => {
5840 out.push(0);
5841 out.extend_from_slice(&n.to_le_bytes());
5842 }
5843 }
5844 }
5845 }
5846 (Value::BigIntArray(items), DataType::BigIntArray) => {
5849 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5850 out.extend_from_slice(&count.to_le_bytes());
5851 for item in items {
5852 match item {
5853 None => out.push(1),
5854 Some(n) => {
5855 out.push(0);
5856 out.extend_from_slice(&n.to_le_bytes());
5857 }
5858 }
5859 }
5860 }
5861 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
5864 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
5866 (other, ty) => unreachable!(
5870 "schema-driven encode received mismatched value/type pair: \
5871 value tag={:?}, column type={:?}",
5872 other.data_type(),
5873 ty
5874 ),
5875 }
5876}
5877
5878fn write_value(out: &mut Vec<u8>, v: &Value) {
5879 match v {
5880 Value::Null => out.push(0),
5881 Value::SmallInt(n) => {
5882 out.push(7);
5883 out.extend_from_slice(&n.to_le_bytes());
5884 }
5885 Value::Int(n) => {
5886 out.push(1);
5887 out.extend_from_slice(&n.to_le_bytes());
5888 }
5889 Value::BigInt(n) => {
5890 out.push(2);
5891 out.extend_from_slice(&n.to_le_bytes());
5892 }
5893 Value::Float(x) => {
5894 out.push(3);
5895 out.extend_from_slice(&x.to_le_bytes());
5896 }
5897 Value::Text(s) | Value::Json(s) => {
5902 out.push(4);
5903 write_str(out, s);
5904 }
5905 Value::Bool(b) => {
5906 out.push(5);
5907 out.push(u8::from(*b));
5908 }
5909 Value::Vector(v) => {
5910 out.push(6);
5911 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5912 out.extend_from_slice(&dim.to_le_bytes());
5913 for x in v {
5914 out.extend_from_slice(&x.to_le_bytes());
5915 }
5916 }
5917 Value::Sq8Vector(q) => {
5922 out.push(11);
5923 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5924 out.extend_from_slice(&dim.to_le_bytes());
5925 out.extend_from_slice(&q.min.to_le_bytes());
5926 out.extend_from_slice(&q.max.to_le_bytes());
5927 out.extend_from_slice(&q.bytes);
5928 }
5929 Value::HalfVector(h) => {
5934 out.push(12);
5935 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5936 out.extend_from_slice(&dim.to_le_bytes());
5937 out.extend_from_slice(&h.bytes);
5938 }
5939 Value::Numeric { scaled, scale } => {
5940 out.push(8);
5941 out.extend_from_slice(&scaled.to_le_bytes());
5942 out.push(*scale);
5943 }
5944 Value::Date(d) => {
5945 out.push(9);
5946 out.extend_from_slice(&d.to_le_bytes());
5947 }
5948 Value::Timestamp(t) => {
5949 out.push(10);
5950 out.extend_from_slice(&t.to_le_bytes());
5951 }
5952 Value::Interval { .. } => {
5956 unreachable!(
5957 "Value::Interval has no on-disk encoding; engine must reject it before write"
5958 )
5959 }
5960 Value::Bytes(b) => {
5965 out.push(14);
5966 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5967 out.extend_from_slice(&len.to_le_bytes());
5968 out.extend_from_slice(b);
5969 }
5970 Value::TextArray(items) => {
5973 out.push(15);
5974 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5975 out.extend_from_slice(&count.to_le_bytes());
5976 for item in items {
5977 match item {
5978 None => out.push(1),
5979 Some(s) => {
5980 out.push(0);
5981 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5982 out.extend_from_slice(&len.to_le_bytes());
5983 out.extend_from_slice(s.as_bytes());
5984 }
5985 }
5986 }
5987 }
5988 Value::IntArray(items) => {
5991 out.push(16);
5992 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5993 out.extend_from_slice(&count.to_le_bytes());
5994 for item in items {
5995 match item {
5996 None => out.push(1),
5997 Some(n) => {
5998 out.push(0);
5999 out.extend_from_slice(&n.to_le_bytes());
6000 }
6001 }
6002 }
6003 }
6004 Value::BigIntArray(items) => {
6007 out.push(17);
6008 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6009 out.extend_from_slice(&count.to_le_bytes());
6010 for item in items {
6011 match item {
6012 None => out.push(1),
6013 Some(n) => {
6014 out.push(0);
6015 out.extend_from_slice(&n.to_le_bytes());
6016 }
6017 }
6018 }
6019 }
6020 Value::TsVector(lexs) => {
6023 out.push(18);
6024 write_tsvector_body(out, lexs);
6025 }
6026 Value::TsQuery(ast) => {
6029 out.push(19);
6030 write_tsquery_body(out, ast);
6031 }
6032 }
6033}
6034
6035fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
6038 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
6039 out.extend_from_slice(&count.to_le_bytes());
6040 for l in lexs {
6041 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
6042 out.extend_from_slice(&wlen.to_le_bytes());
6043 out.extend_from_slice(l.word.as_bytes());
6044 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
6045 out.extend_from_slice(&plen.to_le_bytes());
6046 for p in &l.positions {
6047 out.extend_from_slice(&p.to_le_bytes());
6048 }
6049 out.push(l.weight);
6050 }
6051}
6052
6053fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
6057 match ast {
6058 TsQueryAst::Term { word, weight_mask } => {
6059 out.push(0);
6060 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
6061 out.extend_from_slice(&len.to_le_bytes());
6062 out.extend_from_slice(word.as_bytes());
6063 out.push(*weight_mask);
6064 }
6065 TsQueryAst::And(a, b) => {
6066 out.push(1);
6067 write_tsquery_body(out, a);
6068 write_tsquery_body(out, b);
6069 }
6070 TsQueryAst::Or(a, b) => {
6071 out.push(2);
6072 write_tsquery_body(out, a);
6073 write_tsquery_body(out, b);
6074 }
6075 TsQueryAst::Not(x) => {
6076 out.push(3);
6077 write_tsquery_body(out, x);
6078 }
6079 TsQueryAst::Phrase {
6080 left,
6081 right,
6082 distance,
6083 } => {
6084 out.push(4);
6085 out.extend_from_slice(&distance.to_le_bytes());
6086 write_tsquery_body(out, left);
6087 write_tsquery_body(out, right);
6088 }
6089 }
6090}
6091
6092fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6094 match ast {
6095 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6096 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6097 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6098 }
6099 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6100 TsQueryAst::Phrase { left, right, .. } => {
6101 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6102 }
6103 }
6104}
6105
6106fn write_u16(out: &mut Vec<u8>, n: u16) {
6107 out.extend_from_slice(&n.to_le_bytes());
6108}
6109fn write_u32(out: &mut Vec<u8>, n: u32) {
6110 out.extend_from_slice(&n.to_le_bytes());
6111}
6112fn write_str(out: &mut Vec<u8>, s: &str) {
6113 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6114 write_u16(out, len);
6115 out.extend_from_slice(s.as_bytes());
6116}
6117
6118fn write_str_long(out: &mut Vec<u8>, s: &str) {
6123 let len = u32::try_from(s.len()).expect("function body fits in u32");
6124 write_u32(out, len);
6125 out.extend_from_slice(s.as_bytes());
6126}
6127
6128fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6132 match key {
6133 IndexKey::Int(n) => {
6134 out.push(INDEX_KEY_TAG_INT);
6135 out.extend_from_slice(&n.to_le_bytes());
6136 }
6137 IndexKey::Text(s) => {
6138 out.push(INDEX_KEY_TAG_TEXT);
6139 write_str(out, s);
6140 }
6141 IndexKey::Bool(b) => {
6142 out.push(INDEX_KEY_TAG_BOOL);
6143 out.push(u8::from(*b));
6144 }
6145 }
6146}
6147
6148struct Cursor<'a> {
6149 buf: &'a [u8],
6150 pos: usize,
6151}
6152
6153impl<'a> Cursor<'a> {
6154 const fn new(buf: &'a [u8]) -> Self {
6155 Self { buf, pos: 0 }
6156 }
6157
6158 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6159 let end = self
6160 .pos
6161 .checked_add(n)
6162 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6163 if end > self.buf.len() {
6164 return Err(StorageError::Corrupt(format!(
6165 "unexpected EOF at offset {} (wanted {n} more bytes)",
6166 self.pos
6167 )));
6168 }
6169 let s = &self.buf[self.pos..end];
6170 self.pos = end;
6171 Ok(s)
6172 }
6173
6174 fn read_u8(&mut self) -> Result<u8, StorageError> {
6175 Ok(self.take(1)?[0])
6176 }
6177 fn read_u16(&mut self) -> Result<u16, StorageError> {
6178 let s = self.take(2)?;
6179 Ok(u16::from_le_bytes([s[0], s[1]]))
6180 }
6181 fn read_u32(&mut self) -> Result<u32, StorageError> {
6182 let s = self.take(4)?;
6183 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6184 }
6185 fn read_i32(&mut self) -> Result<i32, StorageError> {
6186 let s = self.take(4)?;
6187 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6188 }
6189 fn read_u64(&mut self) -> Result<u64, StorageError> {
6192 let s = self.take(8)?;
6193 Ok(u64::from_le_bytes([
6194 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6195 ]))
6196 }
6197 fn read_i64(&mut self) -> Result<i64, StorageError> {
6198 let s = self.take(8)?;
6199 let arr: [u8; 8] = s.try_into().expect("checked");
6200 Ok(i64::from_le_bytes(arr))
6201 }
6202 fn read_f64(&mut self) -> Result<f64, StorageError> {
6203 let s = self.take(8)?;
6204 let arr: [u8; 8] = s.try_into().expect("checked");
6205 Ok(f64::from_le_bytes(arr))
6206 }
6207 fn read_f32(&mut self) -> Result<f32, StorageError> {
6208 let s = self.take(4)?;
6209 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6210 }
6211 fn read_str(&mut self) -> Result<String, StorageError> {
6212 let len = self.read_u16()? as usize;
6213 let bytes = self.take(len)?;
6214 core::str::from_utf8(bytes)
6215 .map(String::from)
6216 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6217 }
6218
6219 fn read_str_long(&mut self) -> Result<String, StorageError> {
6223 let len = self.read_u32()? as usize;
6224 let bytes = self.take(len)?;
6225 core::str::from_utf8(bytes)
6226 .map(String::from)
6227 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6228 }
6229
6230 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6234 let tag = self.read_u8()?;
6235 match tag {
6236 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6237 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6238 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6239 other => Err(StorageError::Corrupt(format!(
6240 "unknown index key tag: {other}"
6241 ))),
6242 }
6243 }
6244 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6250 match ty {
6251 DataType::SmallInt => {
6252 let s = self.take(2)?;
6253 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6254 }
6255 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6256 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6257 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6258 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6259 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6260 Ok(Value::Text(self.read_str()?))
6261 }
6262 DataType::Vector {
6263 encoding: VecEncoding::F32,
6264 ..
6265 } => {
6266 let dim = self.read_u32()? as usize;
6267 let mut v = Vec::with_capacity(dim);
6268 for _ in 0..dim {
6269 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6270 v.push(f32::from_le_bytes(bytes));
6271 }
6272 Ok(Value::Vector(v))
6273 }
6274 DataType::Vector {
6275 encoding: VecEncoding::Sq8,
6276 ..
6277 } => {
6278 let dim = self.read_u32()? as usize;
6279 let min = self.read_f32()?;
6280 let max = self.read_f32()?;
6281 let bytes = self.take(dim)?.to_vec();
6282 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6283 }
6284 DataType::Vector {
6285 encoding: VecEncoding::F16,
6286 ..
6287 } => {
6288 let dim = self.read_u32()? as usize;
6289 let bytes = self.take(dim * 2)?.to_vec();
6290 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6291 }
6292 DataType::Numeric { .. } => {
6293 let s = self.take(16)?;
6294 let arr: [u8; 16] = s.try_into().expect("checked");
6295 let scaled = i128::from_le_bytes(arr);
6296 let scale = self.read_u8()?;
6297 Ok(Value::Numeric { scaled, scale })
6298 }
6299 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6300 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6301 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6302 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6303 DataType::Interval => {
6304 Err(StorageError::Corrupt(
6309 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6310 ))
6311 }
6312 DataType::Json => Ok(Value::Json(self.read_str()?)),
6313 DataType::Bytes => {
6316 let len = self.read_u16()? as usize;
6317 let bytes = self.take(len)?.to_vec();
6318 Ok(Value::Bytes(bytes))
6319 }
6320 DataType::TextArray => {
6322 let count = self.read_u16()? as usize;
6323 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6324 for _ in 0..count {
6325 match self.read_u8()? {
6326 0 => items.push(Some(self.read_str()?)),
6327 1 => items.push(None),
6328 other => {
6329 return Err(StorageError::Corrupt(format!(
6330 "TEXT[] null flag: unknown byte {other}"
6331 )));
6332 }
6333 }
6334 }
6335 Ok(Value::TextArray(items))
6336 }
6337 DataType::IntArray => {
6339 let count = self.read_u16()? as usize;
6340 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6341 for _ in 0..count {
6342 match self.read_u8()? {
6343 0 => items.push(Some(self.read_i32()?)),
6344 1 => items.push(None),
6345 other => {
6346 return Err(StorageError::Corrupt(format!(
6347 "INT[] null flag: unknown byte {other}"
6348 )));
6349 }
6350 }
6351 }
6352 Ok(Value::IntArray(items))
6353 }
6354 DataType::BigIntArray => {
6356 let count = self.read_u16()? as usize;
6357 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6358 for _ in 0..count {
6359 match self.read_u8()? {
6360 0 => items.push(Some(self.read_i64()?)),
6361 1 => items.push(None),
6362 other => {
6363 return Err(StorageError::Corrupt(format!(
6364 "BIGINT[] null flag: unknown byte {other}"
6365 )));
6366 }
6367 }
6368 }
6369 Ok(Value::BigIntArray(items))
6370 }
6371 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6375 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6376 }
6377 }
6378
6379 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6381 let count = self.read_u16()? as usize;
6382 let mut out = Vec::with_capacity(count);
6383 for _ in 0..count {
6384 let word = self.read_str()?;
6385 let pos_count = self.read_u16()? as usize;
6386 let mut positions = Vec::with_capacity(pos_count);
6387 for _ in 0..pos_count {
6388 positions.push(self.read_u16()?);
6389 }
6390 let weight = self.read_u8()?;
6391 out.push(TsLexeme {
6392 word,
6393 positions,
6394 weight,
6395 });
6396 }
6397 Ok(out)
6398 }
6399
6400 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6402 let tag = self.read_u8()?;
6403 match tag {
6404 0 => {
6405 let word = self.read_str()?;
6406 let weight_mask = self.read_u8()?;
6407 Ok(TsQueryAst::Term { word, weight_mask })
6408 }
6409 1 => {
6410 let a = self.read_tsquery_body()?;
6411 let b = self.read_tsquery_body()?;
6412 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6413 }
6414 2 => {
6415 let a = self.read_tsquery_body()?;
6416 let b = self.read_tsquery_body()?;
6417 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6418 }
6419 3 => {
6420 let x = self.read_tsquery_body()?;
6421 Ok(TsQueryAst::Not(Box::new(x)))
6422 }
6423 4 => {
6424 let distance = self.read_u16()?;
6425 let left = self.read_tsquery_body()?;
6426 let right = self.read_tsquery_body()?;
6427 Ok(TsQueryAst::Phrase {
6428 left: Box::new(left),
6429 right: Box::new(right),
6430 distance,
6431 })
6432 }
6433 other => Err(StorageError::Corrupt(format!(
6434 "tsquery: unknown node tag {other}"
6435 ))),
6436 }
6437 }
6438
6439 fn read_value(&mut self) -> Result<Value, StorageError> {
6440 let tag = self.read_u8()?;
6441 match tag {
6442 0 => Ok(Value::Null),
6443 1 => Ok(Value::Int(self.read_i32()?)),
6444 2 => Ok(Value::BigInt(self.read_i64()?)),
6445 3 => Ok(Value::Float(self.read_f64()?)),
6446 4 => Ok(Value::Text(self.read_str()?)),
6447 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6448 6 => {
6449 let dim = self.read_u32()? as usize;
6450 let mut v = Vec::with_capacity(dim);
6451 for _ in 0..dim {
6452 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6453 v.push(f32::from_le_bytes(bytes));
6454 }
6455 Ok(Value::Vector(v))
6456 }
6457 7 => {
6458 let s = self.take(2)?;
6459 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6460 }
6461 8 => {
6462 let s = self.take(16)?;
6463 let arr: [u8; 16] = s.try_into().expect("checked");
6464 let scaled = i128::from_le_bytes(arr);
6465 let scale = self.read_u8()?;
6466 Ok(Value::Numeric { scaled, scale })
6467 }
6468 9 => Ok(Value::Date(self.read_i32()?)),
6469 10 => Ok(Value::Timestamp(self.read_i64()?)),
6470 11 => {
6475 let dim = self.read_u32()? as usize;
6476 let min = self.read_f32()?;
6477 let max = self.read_f32()?;
6478 let bytes = self.take(dim)?.to_vec();
6479 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6480 }
6481 12 => {
6484 let dim = self.read_u32()? as usize;
6485 let bytes = self.take(dim * 2)?.to_vec();
6486 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6487 }
6488 14 => {
6490 let len = self.read_u16()? as usize;
6491 let bytes = self.take(len)?.to_vec();
6492 Ok(Value::Bytes(bytes))
6493 }
6494 15 => {
6497 let count = self.read_u16()? as usize;
6498 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6499 for _ in 0..count {
6500 match self.read_u8()? {
6501 0 => items.push(Some(self.read_str()?)),
6502 1 => items.push(None),
6503 other => {
6504 return Err(StorageError::Corrupt(format!(
6505 "TEXT[] null flag in value tag: unknown byte {other}"
6506 )));
6507 }
6508 }
6509 }
6510 Ok(Value::TextArray(items))
6511 }
6512 16 => {
6514 let count = self.read_u16()? as usize;
6515 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6516 for _ in 0..count {
6517 match self.read_u8()? {
6518 0 => items.push(Some(self.read_i32()?)),
6519 1 => items.push(None),
6520 other => {
6521 return Err(StorageError::Corrupt(format!(
6522 "INT[] null flag in value tag: unknown byte {other}"
6523 )));
6524 }
6525 }
6526 }
6527 Ok(Value::IntArray(items))
6528 }
6529 17 => {
6530 let count = self.read_u16()? as usize;
6531 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6532 for _ in 0..count {
6533 match self.read_u8()? {
6534 0 => items.push(Some(self.read_i64()?)),
6535 1 => items.push(None),
6536 other => {
6537 return Err(StorageError::Corrupt(format!(
6538 "BIGINT[] null flag in value tag: unknown byte {other}"
6539 )));
6540 }
6541 }
6542 }
6543 Ok(Value::BigIntArray(items))
6544 }
6545 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6548 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6550 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6551 }
6552 }
6553
6554 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6558 let m_max_0 = self.read_u16()? as usize;
6559 let entry_raw = self.read_u32()?;
6560 let entry = if entry_raw == u32::MAX {
6561 None
6562 } else {
6563 Some(entry_raw as usize)
6564 };
6565 let entry_level = self.read_u8()?;
6566 let node_count = self.read_u32()? as usize;
6567 let mut levels: PersistentVec<u8> = PersistentVec::new();
6572 for _ in 0..node_count {
6573 levels.push_mut(self.read_u8()?);
6574 }
6575 let layer_count = self.read_u8()? as usize;
6576 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6577 for _ in 0..layer_count {
6578 let n = self.read_u32()? as usize;
6579 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6580 for _ in 0..n {
6581 let cnt = self.read_u16()? as usize;
6582 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6583 for _ in 0..cnt {
6584 row.push(self.read_u32()?);
6585 }
6586 per_layer.push_mut(row);
6587 }
6588 layers.push(per_layer);
6589 }
6590 Ok(NswGraph {
6591 m,
6592 m_max_0,
6593 entry,
6594 entry_level,
6595 levels,
6596 layers,
6597 })
6598 }
6599}
6600
6601#[cfg(test)]
6602mod tests {
6603 use super::*;
6604 use alloc::string::ToString;
6605 use alloc::vec;
6606
6607 #[cfg(target_arch = "aarch64")]
6608 #[test]
6609 fn neon_l2_matches_scalar() {
6610 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6615 for &d in &dims {
6616 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6617 let mut a = Vec::with_capacity(d);
6618 let mut b = Vec::with_capacity(d);
6619 for _ in 0..d {
6620 state = state
6621 .wrapping_mul(6_364_136_223_846_793_005)
6622 .wrapping_add(1);
6623 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6624 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6625 state = state
6626 .wrapping_mul(6_364_136_223_846_793_005)
6627 .wrapping_add(1);
6628 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6629 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6630 a.push(x);
6631 b.push(y);
6632 }
6633 let scalar = l2_distance_sq_scalar(&a, &b);
6634 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6635 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6636 assert!(
6637 (scalar - neon).abs() <= tol,
6638 "dim={d}: scalar={scalar} neon={neon} diff={}",
6639 (scalar - neon).abs()
6640 );
6641 }
6642 }
6643
6644 #[cfg(target_arch = "aarch64")]
6645 #[test]
6646 fn neon_inner_product_matches_scalar() {
6647 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6651 for &d in &dims {
6652 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6653 let mut a = Vec::with_capacity(d);
6654 let mut b = Vec::with_capacity(d);
6655 for _ in 0..d {
6656 state = state
6657 .wrapping_mul(6_364_136_223_846_793_005)
6658 .wrapping_add(1);
6659 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6660 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6661 state = state
6662 .wrapping_mul(6_364_136_223_846_793_005)
6663 .wrapping_add(1);
6664 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6665 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6666 a.push(x);
6667 b.push(y);
6668 }
6669 let scalar = inner_product_scalar(&a, &b);
6670 let neon = unsafe { inner_product_neon(&a, &b) };
6671 #[allow(clippy::cast_precision_loss)]
6672 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6673 assert!(
6674 (scalar - neon).abs() <= tol,
6675 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6676 (scalar - neon).abs()
6677 );
6678 }
6679 }
6680
6681 #[cfg(target_arch = "aarch64")]
6682 #[allow(clippy::similar_names)]
6683 #[test]
6684 fn neon_cosine_dot_norms_matches_scalar() {
6685 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6686 for &d in &dims {
6687 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
6688 let mut a = Vec::with_capacity(d);
6689 let mut b = Vec::with_capacity(d);
6690 for _ in 0..d {
6691 state = state
6692 .wrapping_mul(6_364_136_223_846_793_005)
6693 .wrapping_add(1);
6694 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6695 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6696 state = state
6697 .wrapping_mul(6_364_136_223_846_793_005)
6698 .wrapping_add(1);
6699 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6700 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6701 a.push(x);
6702 b.push(y);
6703 }
6704 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
6705 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
6706 #[allow(clippy::cast_precision_loss)]
6707 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6708 #[allow(clippy::cast_precision_loss)]
6709 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6710 assert!(
6711 (dot_s - dot_n).abs() <= tol_d,
6712 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
6713 );
6714 assert!(
6715 (na_s - na_n).abs() <= tol_n,
6716 "cosine na dim={d}: scalar={na_s} neon={na_n}"
6717 );
6718 assert!(
6719 (nb_s - nb_n).abs() <= tol_n,
6720 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
6721 );
6722 }
6723 }
6724
6725 fn make_users_schema() -> TableSchema {
6726 TableSchema::new(
6727 "users",
6728 vec![
6729 ColumnSchema::new("id", DataType::Int, false),
6730 ColumnSchema::new("name", DataType::Text, false),
6731 ColumnSchema::new("score", DataType::Float, true),
6732 ],
6733 )
6734 }
6735
6736 #[test]
6737 fn value_type_tag_matches_variant() {
6738 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
6739 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
6740 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
6741 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
6742 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
6743 assert_eq!(Value::Null.data_type(), None);
6744 assert!(Value::Null.is_null());
6745 assert!(!Value::Int(0).is_null());
6746 }
6747
6748 #[test]
6749 fn sq8_value_reports_sq8_data_type() {
6750 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
6755 let v = Value::Sq8Vector(q);
6756 assert_eq!(
6757 v.data_type(),
6758 Some(DataType::Vector {
6759 dim: 5,
6760 encoding: VecEncoding::Sq8,
6761 }),
6762 );
6763 }
6764
6765 #[test]
6766 fn datatype_display_matches_pg_keyword() {
6767 assert_eq!(DataType::Int.to_string(), "INT");
6768 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
6769 assert_eq!(DataType::Float.to_string(), "FLOAT");
6770 assert_eq!(DataType::Text.to_string(), "TEXT");
6771 assert_eq!(DataType::Bool.to_string(), "BOOL");
6772 }
6773
6774 #[test]
6775 fn row_len_and_emptiness() {
6776 let r = Row::new(vec![Value::Int(1), Value::Null]);
6777 assert_eq!(r.len(), 2);
6778 assert!(!r.is_empty());
6779 assert!(Row::new(Vec::new()).is_empty());
6780 }
6781
6782 #[test]
6783 fn table_schema_column_position() {
6784 let s = make_users_schema();
6785 assert_eq!(s.column_position("id"), Some(0));
6786 assert_eq!(s.column_position("score"), Some(2));
6787 assert_eq!(s.column_position("missing"), None);
6788 }
6789
6790 #[test]
6791 fn catalog_create_table_then_lookup() {
6792 let mut cat = Catalog::new();
6793 cat.create_table(make_users_schema()).unwrap();
6794 assert_eq!(cat.table_count(), 1);
6795 assert!(cat.get("users").is_some());
6796 assert!(cat.get("nope").is_none());
6797 }
6798
6799 #[test]
6800 fn catalog_duplicate_table_is_rejected() {
6801 let mut cat = Catalog::new();
6802 cat.create_table(make_users_schema()).unwrap();
6803 let err = cat.create_table(make_users_schema()).unwrap_err();
6804 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
6805 }
6806
6807 #[test]
6808 fn table_insert_happy_path_appends_row() {
6809 let mut cat = Catalog::new();
6810 cat.create_table(make_users_schema()).unwrap();
6811 let t = cat.get_mut("users").unwrap();
6812 t.insert(Row::new(vec![
6813 Value::Int(1),
6814 Value::Text("alice".into()),
6815 Value::Float(99.5),
6816 ]))
6817 .unwrap();
6818 assert_eq!(t.row_count(), 1);
6819 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
6820 }
6821
6822 #[test]
6823 fn table_insert_arity_mismatch() {
6824 let mut cat = Catalog::new();
6825 cat.create_table(make_users_schema()).unwrap();
6826 let t = cat.get_mut("users").unwrap();
6827 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
6828 assert!(matches!(
6829 err,
6830 StorageError::ArityMismatch {
6831 expected: 3,
6832 actual: 1
6833 }
6834 ));
6835 assert_eq!(t.row_count(), 0);
6836 }
6837
6838 #[test]
6839 fn table_insert_type_mismatch_reports_column() {
6840 let mut cat = Catalog::new();
6841 cat.create_table(make_users_schema()).unwrap();
6842 let t = cat.get_mut("users").unwrap();
6843 let err = t
6844 .insert(Row::new(vec![
6845 Value::Int(1),
6846 Value::Int(42), Value::Float(0.0),
6848 ]))
6849 .unwrap_err();
6850 match err {
6851 StorageError::TypeMismatch {
6852 ref column,
6853 expected,
6854 actual,
6855 position,
6856 } => {
6857 assert_eq!(column, "name");
6858 assert_eq!(expected, DataType::Text);
6859 assert_eq!(actual, DataType::Int);
6860 assert_eq!(position, 1);
6861 }
6862 other => panic!("unexpected: {other:?}"),
6863 }
6864 assert_eq!(t.row_count(), 0);
6865 }
6866
6867 #[test]
6868 fn table_insert_null_into_not_null_rejected() {
6869 let mut cat = Catalog::new();
6870 cat.create_table(make_users_schema()).unwrap();
6871 let t = cat.get_mut("users").unwrap();
6872 let err = t
6873 .insert(Row::new(vec![
6874 Value::Int(1),
6875 Value::Null, Value::Float(1.0),
6877 ]))
6878 .unwrap_err();
6879 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
6880 }
6881
6882 #[test]
6883 fn table_insert_null_into_nullable_ok() {
6884 let mut cat = Catalog::new();
6885 cat.create_table(make_users_schema()).unwrap();
6886 let t = cat.get_mut("users").unwrap();
6887 t.insert(Row::new(vec![
6888 Value::Int(1),
6889 Value::Text("bob".into()),
6890 Value::Null,
6891 ]))
6892 .unwrap();
6893 assert_eq!(t.row_count(), 1);
6894 }
6895
6896 #[test]
6897 fn catalog_get_mut_independent_per_table() {
6898 let mut cat = Catalog::new();
6899 cat.create_table(TableSchema::new(
6900 "a",
6901 vec![ColumnSchema::new("v", DataType::Int, false)],
6902 ))
6903 .unwrap();
6904 cat.create_table(TableSchema::new(
6905 "b",
6906 vec![ColumnSchema::new("v", DataType::Int, false)],
6907 ))
6908 .unwrap();
6909 cat.get_mut("a")
6910 .unwrap()
6911 .insert(Row::new(vec![Value::Int(1)]))
6912 .unwrap();
6913 assert_eq!(cat.get("a").unwrap().row_count(), 1);
6914 assert_eq!(cat.get("b").unwrap().row_count(), 0);
6915 }
6916
6917 fn assert_round_trip(cat: &Catalog) {
6920 let bytes = cat.serialize();
6921 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6922 assert_eq!(restored.table_count(), cat.table_count());
6925 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
6926 assert_eq!(a.schema, b.schema);
6927 assert_eq!(a.rows, b.rows);
6928 }
6929 }
6930
6931 #[test]
6932 fn serialize_empty_catalog_round_trips() {
6933 assert_round_trip(&Catalog::new());
6934 }
6935
6936 #[test]
6937 fn serialize_single_empty_table_round_trips() {
6938 let mut cat = Catalog::new();
6939 cat.create_table(make_users_schema()).unwrap();
6940 assert_round_trip(&cat);
6941 }
6942
6943 #[test]
6944 fn nsw_clone_is_o1() {
6945 let mut cat = Catalog::new();
6954 cat.create_table(TableSchema::new(
6955 "docs",
6956 alloc::vec![
6957 ColumnSchema::new("id", DataType::Int, false),
6958 ColumnSchema::new(
6959 "v",
6960 DataType::Vector {
6961 dim: 3,
6962 encoding: VecEncoding::F32
6963 },
6964 true
6965 ),
6966 ],
6967 ))
6968 .unwrap();
6969 let t = cat.get_mut("docs").unwrap();
6970 for i in 0..1500_i32 {
6971 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
6973 t.insert(Row::new(alloc::vec![
6974 Value::Int(i),
6975 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6976 ]))
6977 .unwrap();
6978 }
6979 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6980 .unwrap();
6981 let g = match &cat.get("docs").unwrap().indices()[0].kind {
6982 IndexKind::Nsw(g) => g,
6983 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
6984 panic!("expected NSW")
6985 }
6986 };
6987 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
6990 assert!(
6991 g.layers.len() >= 2,
6992 "1500 nodes should populate at least two HNSW layers, got {}",
6993 g.layers.len()
6994 );
6995
6996 let cloned = g.clone();
6997
6998 assert!(
6999 g.levels.shares_storage_with(&cloned.levels),
7000 "levels PV not shared after clone — clone copied elements (O(N))"
7001 );
7002 assert_eq!(g.layers.len(), cloned.layers.len());
7003 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
7004 assert!(
7005 orig.shares_storage_with(cl),
7006 "layer {l} PV not shared after clone — clone copied elements (O(N))"
7007 );
7008 }
7009 }
7010
7011 #[test]
7012 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
7013 let mut cat = Catalog::new();
7020 cat.create_table(TableSchema::new(
7021 "vecs",
7022 alloc::vec![
7023 ColumnSchema::new("id", DataType::Int, false),
7024 ColumnSchema::new(
7025 "v",
7026 DataType::Vector {
7027 dim: 8,
7028 encoding: VecEncoding::Sq8,
7029 },
7030 false,
7031 ),
7032 ],
7033 ))
7034 .unwrap();
7035 let t = cat.get_mut("vecs").unwrap();
7036 for i in 0..32_i32 {
7037 #[allow(clippy::cast_precision_loss)]
7038 let base = (i as f32) * 0.03;
7039 let v: Vec<f32> = (0..8_i32)
7040 .map(|j| {
7041 #[allow(clippy::cast_precision_loss)]
7042 let off = (j as f32) * 0.01;
7043 base + off
7044 })
7045 .collect();
7046 t.insert(Row::new(alloc::vec![
7047 Value::Int(i),
7048 Value::Sq8Vector(quantize::quantize(&v)),
7049 ]))
7050 .unwrap();
7051 }
7052 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7053 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7056 let (before_cell, before_ty, before_hits) = {
7057 let t_ref = cat.get("vecs").unwrap();
7058 (
7059 t_ref.rows()[5].values[1].clone(),
7060 t_ref.schema().columns[1].ty,
7061 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7062 )
7063 };
7064
7065 let bytes = cat.serialize();
7066 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7067 let rt = restored.get("vecs").unwrap();
7068 assert_eq!(rt.schema().columns[1].ty, before_ty);
7069 assert_eq!(rt.rows()[5].values[1], before_cell);
7070 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7071 assert_eq!(before_hits, after_hits);
7072 }
7073
7074 #[test]
7075 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7076 use crate::halfvec;
7083 let mut cat = Catalog::new();
7084 cat.create_table(TableSchema::new(
7085 "vecs",
7086 alloc::vec![
7087 ColumnSchema::new("id", DataType::Int, false),
7088 ColumnSchema::new(
7089 "v",
7090 DataType::Vector {
7091 dim: 8,
7092 encoding: VecEncoding::F16,
7093 },
7094 false,
7095 ),
7096 ],
7097 ))
7098 .unwrap();
7099 let t = cat.get_mut("vecs").unwrap();
7100 for i in 0..32_i32 {
7101 #[allow(clippy::cast_precision_loss)]
7102 let base = (i as f32) * 0.03;
7103 let v: Vec<f32> = (0..8_i32)
7104 .map(|j| {
7105 #[allow(clippy::cast_precision_loss)]
7106 let off = (j as f32) * 0.01;
7107 base + off
7108 })
7109 .collect();
7110 t.insert(Row::new(alloc::vec![
7111 Value::Int(i),
7112 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7113 ]))
7114 .unwrap();
7115 }
7116 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7117 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7118 let (before_cell, before_ty, before_hits) = {
7119 let t_ref = cat.get("vecs").unwrap();
7120 (
7121 t_ref.rows()[5].values[1].clone(),
7122 t_ref.schema().columns[1].ty,
7123 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7124 )
7125 };
7126 let bytes = cat.serialize();
7127 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7128 let rt = restored.get("vecs").unwrap();
7129 assert_eq!(rt.schema().columns[1].ty, before_ty);
7130 assert_eq!(rt.rows()[5].values[1], before_cell);
7131 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7132 assert_eq!(before_hits, after_hits);
7133 }
7134
7135 #[test]
7136 #[allow(clippy::similar_names)]
7137 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7138 use crate::halfvec;
7145 fn next(state: &mut u64) -> f32 {
7146 *state = state
7147 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7148 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7149 #[allow(clippy::cast_precision_loss)]
7150 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7151 2.0 * u - 1.0
7152 }
7153 let dim: u32 = 32;
7154 let n: usize = 512;
7155 let dim_us = dim as usize;
7156 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7157 let corpus: Vec<Vec<f32>> = (0..n)
7158 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7159 .collect();
7160 let queries: Vec<Vec<f32>> = (0..32)
7161 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7162 .collect();
7163 let exact_top10: Vec<Vec<usize>> = queries
7164 .iter()
7165 .map(|q| {
7166 let mut scored: Vec<(f32, usize)> = corpus
7167 .iter()
7168 .enumerate()
7169 .map(|(i, v)| (l2_distance_sq(v, q), i))
7170 .collect();
7171 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7172 scored.into_iter().take(10).map(|(_, i)| i).collect()
7173 })
7174 .collect();
7175 let mut cat = Catalog::new();
7176 cat.create_table(TableSchema::new(
7177 "vecs",
7178 alloc::vec![
7179 ColumnSchema::new("id", DataType::Int, false),
7180 ColumnSchema::new(
7181 "v",
7182 DataType::Vector {
7183 dim,
7184 encoding: VecEncoding::F16,
7185 },
7186 false,
7187 ),
7188 ],
7189 ))
7190 .unwrap();
7191 let t = cat.get_mut("vecs").unwrap();
7192 for (i, v) in corpus.iter().enumerate() {
7193 t.insert(Row::new(alloc::vec![
7194 Value::Int(i32::try_from(i).unwrap()),
7195 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7196 ]))
7197 .unwrap();
7198 }
7199 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7200 let table = cat.get("vecs").unwrap();
7201 let mut total_overlap = 0_usize;
7202 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7203 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7204 for h in &hits {
7205 if exact.contains(h) {
7206 total_overlap += 1;
7207 }
7208 }
7209 }
7210 #[allow(clippy::cast_precision_loss)]
7211 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7212 assert!(
7213 recall >= 0.95,
7214 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7215 check halfvec dispatch in `cell_to_query_metric_distance`"
7216 );
7217 }
7218
7219 #[test]
7220 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7221 use crate::quantize;
7228 fn next(state: &mut u64) -> f32 {
7232 *state = state
7233 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7234 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7235 #[allow(clippy::cast_precision_loss)]
7236 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7237 2.0 * u - 1.0
7238 }
7239 let dim: u32 = 32;
7240 let n: usize = 512;
7241 let dim_us = dim as usize;
7242 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7243 let corpus: Vec<Vec<f32>> = (0..n)
7244 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7245 .collect();
7246 let queries: Vec<Vec<f32>> = (0..32)
7247 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7248 .collect();
7249 let exact_top10: Vec<Vec<usize>> = queries
7251 .iter()
7252 .map(|q| {
7253 let mut scored: Vec<(f32, usize)> = corpus
7254 .iter()
7255 .enumerate()
7256 .map(|(i, v)| (l2_distance_sq(v, q), i))
7257 .collect();
7258 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7259 scored.into_iter().take(10).map(|(_, i)| i).collect()
7260 })
7261 .collect();
7262 let mut cat = Catalog::new();
7265 cat.create_table(TableSchema::new(
7266 "vecs",
7267 alloc::vec![
7268 ColumnSchema::new("id", DataType::Int, false),
7269 ColumnSchema::new(
7270 "v",
7271 DataType::Vector {
7272 dim,
7273 encoding: VecEncoding::Sq8,
7274 },
7275 false,
7276 ),
7277 ],
7278 ))
7279 .unwrap();
7280 let t = cat.get_mut("vecs").unwrap();
7281 for (i, v) in corpus.iter().enumerate() {
7282 t.insert(Row::new(alloc::vec![
7283 Value::Int(i32::try_from(i).unwrap()),
7284 Value::Sq8Vector(quantize::quantize(v)),
7285 ]))
7286 .unwrap();
7287 }
7288 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7289 let table = cat.get("vecs").unwrap();
7290 let mut total_overlap = 0_usize;
7291 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7292 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7293 for h in &hits {
7294 if exact.contains(h) {
7295 total_overlap += 1;
7296 }
7297 }
7298 }
7299 #[allow(clippy::cast_precision_loss)]
7300 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7301 assert!(
7302 recall >= 0.95,
7303 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7304 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7305 );
7306 }
7307
7308 #[test]
7309 fn nsw_index_topology_persists_through_round_trip() {
7310 let mut cat = Catalog::new();
7316 cat.create_table(TableSchema::new(
7317 "docs",
7318 alloc::vec![
7319 ColumnSchema::new("id", DataType::Int, false),
7320 ColumnSchema::new(
7321 "v",
7322 DataType::Vector {
7323 dim: 3,
7324 encoding: VecEncoding::F32
7325 },
7326 true
7327 ),
7328 ],
7329 ))
7330 .unwrap();
7331 let t = cat.get_mut("docs").unwrap();
7332 for i in 0..6_i32 {
7333 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7335 let row = Row::new(alloc::vec![
7336 Value::Int(i),
7337 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7338 ]);
7339 t.insert(row).unwrap();
7340 }
7341 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7342 .unwrap();
7343 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7344 IndexKind::Nsw(g) => g.clone(),
7345 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7346 panic!("expected NSW")
7347 }
7348 };
7349 let bytes = cat.serialize();
7350 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7351 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7352 IndexKind::Nsw(g) => g.clone(),
7353 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7354 panic!("expected NSW")
7355 }
7356 };
7357 assert_eq!(restored_graph.m, original.m);
7358 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7359 assert_eq!(restored_graph.entry, original.entry);
7360 assert_eq!(restored_graph.entry_level, original.entry_level);
7361 assert_eq!(restored_graph.levels, original.levels);
7362 assert_eq!(restored_graph.layers, original.layers);
7363 }
7364
7365 #[test]
7366 fn hnsw_level_assignment_is_deterministic() {
7367 for i in 0..32usize {
7370 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7371 }
7372 }
7373
7374 #[test]
7375 fn hnsw_layer_0_dominates_population() {
7376 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7381 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7382 }
7383
7384 #[test]
7385 fn hnsw_search_matches_brute_force_for_l2_top1() {
7386 let mut cat = Catalog::new();
7390 cat.create_table(TableSchema::new(
7391 "vecs",
7392 alloc::vec![
7393 ColumnSchema::new("id", DataType::Int, false),
7394 ColumnSchema::new(
7395 "v",
7396 DataType::Vector {
7397 dim: 3,
7398 encoding: VecEncoding::F32
7399 },
7400 true
7401 ),
7402 ],
7403 ))
7404 .unwrap();
7405 let t = cat.get_mut("vecs").unwrap();
7406 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7407 (1, [0.0, 0.0, 0.0]),
7408 (2, [1.0, 0.0, 0.0]),
7409 (3, [0.0, 1.0, 0.0]),
7410 (4, [0.0, 0.0, 1.0]),
7411 (5, [1.0, 1.0, 0.0]),
7412 (6, [1.0, 0.0, 1.0]),
7413 (7, [0.0, 1.0, 1.0]),
7414 (8, [1.0, 1.0, 1.0]),
7415 (9, [0.5, 0.5, 0.5]),
7416 (10, [0.2, 0.8, 0.5]),
7417 ];
7418 for &(id, v) in &dataset {
7419 t.insert(Row::new(alloc::vec![
7420 Value::Int(id),
7421 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7422 ]))
7423 .unwrap();
7424 }
7425 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7426 let idx_pos = cat
7427 .get("vecs")
7428 .unwrap()
7429 .indices()
7430 .iter()
7431 .position(|i| i.name == "v_idx")
7432 .unwrap();
7433 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7434 let table = cat.get("vecs").unwrap();
7435 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7436 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7437 .map(|i| {
7438 let Value::Vector(v) = &table.rows[i].values[1] else {
7439 return (f32::INFINITY, i);
7440 };
7441 (l2_distance_sq(v, &query), i)
7442 })
7443 .collect();
7444 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7445 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7446 assert_eq!(
7447 hnsw_top[0].1, brute[0].1,
7448 "HNSW top-1 != brute-force top-1 for {query:?}"
7449 );
7450 }
7451 }
7452
7453 #[test]
7454 fn serialize_table_with_rows_round_trips() {
7455 let mut cat = Catalog::new();
7456 cat.create_table(make_users_schema()).unwrap();
7457 let t = cat.get_mut("users").unwrap();
7458 t.insert(Row::new(vec![
7459 Value::Int(1),
7460 Value::Text("alice".into()),
7461 Value::Float(95.5),
7462 ]))
7463 .unwrap();
7464 t.insert(Row::new(vec![
7465 Value::Int(2),
7466 Value::Text("bob".into()),
7467 Value::Null,
7468 ]))
7469 .unwrap();
7470 assert_round_trip(&cat);
7471 }
7472
7473 #[test]
7474 fn serialize_multiple_tables_round_trips() {
7475 let mut cat = Catalog::new();
7476 cat.create_table(make_users_schema()).unwrap();
7477 cat.create_table(TableSchema::new(
7478 "flags",
7479 vec![
7480 ColumnSchema::new("id", DataType::BigInt, false),
7481 ColumnSchema::new("active", DataType::Bool, false),
7482 ],
7483 ))
7484 .unwrap();
7485 cat.get_mut("flags")
7486 .unwrap()
7487 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7488 .unwrap();
7489 assert_round_trip(&cat);
7490 }
7491
7492 #[test]
7493 fn deserialize_rejects_bad_magic() {
7494 let mut buf = b"BADMAGIC".to_vec();
7495 buf.push(FILE_VERSION);
7496 buf.extend_from_slice(&0u32.to_le_bytes());
7497 let err = Catalog::deserialize(&buf).unwrap_err();
7498 assert!(matches!(err, StorageError::Corrupt(_)));
7499 }
7500
7501 #[test]
7502 fn deserialize_rejects_unsupported_version() {
7503 let mut buf = FILE_MAGIC.to_vec();
7504 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7506 let err = Catalog::deserialize(&buf).unwrap_err();
7507 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7508 }
7509
7510 #[test]
7511 fn deserialize_rejects_truncated_file() {
7512 let mut cat = Catalog::new();
7513 cat.create_table(make_users_schema()).unwrap();
7514 let bytes = cat.serialize();
7515 let truncated = &bytes[..bytes.len() - 1];
7517 assert!(matches!(
7518 Catalog::deserialize(truncated),
7519 Err(StorageError::Corrupt(_))
7520 ));
7521 }
7522
7523 #[test]
7524 fn deserialize_rejects_trailing_garbage() {
7525 let cat = Catalog::new();
7526 let mut bytes = cat.serialize();
7527 bytes.push(0xFF);
7528 assert!(matches!(
7529 Catalog::deserialize(&bytes),
7530 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7531 ));
7532 }
7533
7534 fn populated_users() -> Catalog {
7537 let mut cat = Catalog::new();
7538 cat.create_table(make_users_schema()).unwrap();
7539 let t = cat.get_mut("users").unwrap();
7540 for (id, name, score) in [
7541 (1, "alice", Some(90.0)),
7542 (2, "bob", None),
7543 (3, "alice", Some(70.0)), ] {
7545 t.insert(Row::new(vec![
7546 Value::Int(id),
7547 Value::Text(name.into()),
7548 score.map_or(Value::Null, Value::Float),
7549 ]))
7550 .unwrap();
7551 }
7552 cat
7553 }
7554
7555 #[test]
7556 fn add_index_builds_from_existing_rows() {
7557 let mut cat = populated_users();
7558 cat.get_mut("users")
7559 .unwrap()
7560 .add_index("by_id".into(), "id")
7561 .unwrap();
7562 let t = cat.get("users").unwrap();
7563 let idx = t.index_on(0).expect("index_on(0)");
7564 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7565 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7566 }
7567
7568 #[test]
7569 fn add_index_dup_name_rejected() {
7570 let mut cat = populated_users();
7571 let t = cat.get_mut("users").unwrap();
7572 t.add_index("ix".into(), "id").unwrap();
7573 let err = t.add_index("ix".into(), "name").unwrap_err();
7574 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7575 }
7576
7577 #[test]
7578 fn add_index_unknown_column_rejected() {
7579 let mut cat = populated_users();
7580 let err = cat
7581 .get_mut("users")
7582 .unwrap()
7583 .add_index("ix".into(), "ghost")
7584 .unwrap_err();
7585 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7586 }
7587
7588 #[test]
7589 fn insert_after_create_index_updates_it() {
7590 let mut cat = populated_users();
7591 let t = cat.get_mut("users").unwrap();
7592 t.add_index("by_name".into(), "name").unwrap();
7593 t.insert(Row::new(vec![
7594 Value::Int(4),
7595 Value::Text("dave".into()),
7596 Value::Null,
7597 ]))
7598 .unwrap();
7599 let idx = t.index_on(1).unwrap();
7600 assert_eq!(
7601 idx.lookup_eq(&IndexKey::Text("dave".into())),
7602 &[RowLocator::Hot(3)]
7603 );
7604 assert_eq!(
7606 idx.lookup_eq(&IndexKey::Text("alice".into())),
7607 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7608 );
7609 }
7610
7611 #[test]
7612 fn null_or_float_values_are_not_indexed() {
7613 let mut cat = populated_users();
7614 let t = cat.get_mut("users").unwrap();
7615 t.add_index("by_score".into(), "score").unwrap();
7616 let idx = t.index_on(2).unwrap();
7617 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7622 }
7623
7624 #[test]
7627 fn vector_value_data_type_carries_dim() {
7628 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7629 assert_eq!(
7630 v.data_type(),
7631 Some(DataType::Vector {
7632 dim: 3,
7633 encoding: VecEncoding::F32
7634 })
7635 );
7636 }
7637
7638 #[test]
7639 fn vector_column_insert_matching_dim_ok() {
7640 let mut cat = Catalog::new();
7641 cat.create_table(TableSchema::new(
7642 "emb",
7643 vec![ColumnSchema::new(
7644 "v",
7645 DataType::Vector {
7646 dim: 3,
7647 encoding: VecEncoding::F32,
7648 },
7649 false,
7650 )],
7651 ))
7652 .unwrap();
7653 cat.get_mut("emb")
7654 .unwrap()
7655 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7656 .unwrap();
7657 }
7658
7659 #[test]
7660 fn vector_column_insert_dim_mismatch_rejected() {
7661 let mut cat = Catalog::new();
7662 cat.create_table(TableSchema::new(
7663 "emb",
7664 vec![ColumnSchema::new(
7665 "v",
7666 DataType::Vector {
7667 dim: 3,
7668 encoding: VecEncoding::F32,
7669 },
7670 false,
7671 )],
7672 ))
7673 .unwrap();
7674 let err = cat
7675 .get_mut("emb")
7676 .unwrap()
7677 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7678 .unwrap_err();
7679 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7680 }
7681
7682 #[test]
7683 fn vector_value_survives_catalog_round_trip() {
7684 let mut cat = Catalog::new();
7685 cat.create_table(TableSchema::new(
7686 "emb",
7687 vec![
7688 ColumnSchema::new("id", DataType::Int, false),
7689 ColumnSchema::new(
7690 "v",
7691 DataType::Vector {
7692 dim: 4,
7693 encoding: VecEncoding::F32,
7694 },
7695 false,
7696 ),
7697 ],
7698 ))
7699 .unwrap();
7700 cat.get_mut("emb")
7701 .unwrap()
7702 .insert(Row::new(vec![
7703 Value::Int(1),
7704 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
7705 ]))
7706 .unwrap();
7707 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
7708 let table = restored.get("emb").unwrap();
7709 assert_eq!(
7710 table.schema().columns[1].ty,
7711 DataType::Vector {
7712 dim: 4,
7713 encoding: VecEncoding::F32
7714 }
7715 );
7716 assert_eq!(
7717 table.rows()[0].values[1],
7718 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
7719 );
7720 }
7721
7722 #[test]
7723 fn index_survives_serialize_deserialize_round_trip() {
7724 let mut cat = populated_users();
7725 cat.get_mut("users")
7726 .unwrap()
7727 .add_index("by_name".into(), "name")
7728 .unwrap();
7729 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7730 let idx = restored
7731 .get("users")
7732 .unwrap()
7733 .index_on(1)
7734 .expect("index_on(1) after restore");
7735 assert_eq!(idx.name, "by_name");
7736 assert_eq!(
7738 idx.lookup_eq(&IndexKey::Text("alice".into())),
7739 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7740 );
7741 }
7742
7743 fn bigint_pk_users_schema() -> TableSchema {
7748 TableSchema::new(
7749 "users",
7750 vec![
7751 ColumnSchema::new("id", DataType::BigInt, false),
7752 ColumnSchema::new("name", DataType::Text, false),
7753 ],
7754 )
7755 }
7756
7757 fn make_user_row(id: i64, name: &str) -> Row {
7758 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
7759 }
7760
7761 #[test]
7762 fn lookup_by_pk_finds_row_via_hot_index() {
7763 let mut cat = Catalog::new();
7764 cat.create_table(bigint_pk_users_schema()).unwrap();
7765 let t = cat.get_mut("users").unwrap();
7766 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7767 t.insert(make_user_row(id, name)).unwrap();
7768 }
7769 t.add_index("by_id".into(), "id").unwrap();
7770 let got = cat
7772 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7773 .unwrap();
7774 assert_eq!(got, make_user_row(2, "bob"));
7775 assert_eq!(cat.cold_segment_count(), 0);
7776 }
7777
7778 #[test]
7779 fn lookup_by_pk_returns_none_when_key_missing() {
7780 let mut cat = Catalog::new();
7781 cat.create_table(bigint_pk_users_schema()).unwrap();
7782 let t = cat.get_mut("users").unwrap();
7783 t.insert(make_user_row(1, "alice")).unwrap();
7784 t.add_index("by_id".into(), "id").unwrap();
7785 assert!(
7786 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7787 .is_none()
7788 );
7789 assert!(
7791 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
7792 .is_none()
7793 );
7794 assert!(
7795 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
7796 .is_none()
7797 );
7798 }
7799
7800 #[test]
7801 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
7802 let mut cat = Catalog::new();
7806 cat.create_table(bigint_pk_users_schema()).unwrap();
7807 let t = cat.get_mut("users").unwrap();
7808 t.add_index("by_id".into(), "id").unwrap();
7809 let schema = t.schema.clone();
7810
7811 let cold_rows: Vec<(i64, &str)> =
7812 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
7813 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7814 .iter()
7815 .map(|(id, name)| {
7816 let row = make_user_row(*id, name);
7817 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7818 })
7819 .collect();
7820 let (seg_bytes, _meta) =
7821 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7822 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7823 assert_eq!(seg_id, 0);
7824 assert_eq!(cat.cold_segment_count(), 1);
7825
7826 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7827 .iter()
7828 .map(|(id, _)| {
7829 (
7830 IndexKey::Int(*id),
7831 RowLocator::Cold {
7832 segment_id: seg_id,
7833 page_offset: 0,
7834 },
7835 )
7836 })
7837 .collect();
7838 let registered = cat
7839 .get_mut("users")
7840 .unwrap()
7841 .register_cold_locators("by_id", pairs)
7842 .unwrap();
7843 assert_eq!(registered, 4);
7844
7845 for (id, name) in &cold_rows {
7846 let got = cat
7847 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7848 .unwrap_or_else(|| panic!("cold key {id} not found"));
7849 assert_eq!(got, make_user_row(*id, name));
7850 }
7851 assert!(
7853 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7854 .is_none()
7855 );
7856 }
7857
7858 #[test]
7859 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
7860 let mut cat = Catalog::new();
7864 cat.create_table(bigint_pk_users_schema()).unwrap();
7865 let t = cat.get_mut("users").unwrap();
7866 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7867 t.insert(make_user_row(id, name)).unwrap();
7868 }
7869 t.add_index("by_id".into(), "id").unwrap();
7870 let schema = t.schema.clone();
7871
7872 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
7873 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7874 .iter()
7875 .map(|(id, name)| {
7876 let row = make_user_row(*id, name);
7877 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7878 })
7879 .collect();
7880 let (seg_bytes, _) =
7881 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7882 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7883 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7884 .iter()
7885 .map(|(id, _)| {
7886 (
7887 IndexKey::Int(*id),
7888 RowLocator::Cold {
7889 segment_id: seg_id,
7890 page_offset: 0,
7891 },
7892 )
7893 })
7894 .collect();
7895 cat.get_mut("users")
7896 .unwrap()
7897 .register_cold_locators("by_id", pairs)
7898 .unwrap();
7899
7900 assert_eq!(
7902 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7903 .unwrap(),
7904 make_user_row(1, "alice")
7905 );
7906 assert_eq!(
7907 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7908 .unwrap(),
7909 make_user_row(2, "bob")
7910 );
7911 assert_eq!(
7913 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
7914 .unwrap(),
7915 make_user_row(100, "ivy")
7916 );
7917 assert_eq!(
7918 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
7919 .unwrap(),
7920 make_user_row(200, "joe")
7921 );
7922 assert!(
7924 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
7925 .is_none()
7926 );
7927 }
7928
7929 #[test]
7930 fn register_cold_locators_rejects_nsw_index() {
7931 let mut cat = Catalog::new();
7932 cat.create_table(TableSchema::new(
7933 "vecs",
7934 vec![
7935 ColumnSchema::new("id", DataType::Int, false),
7936 ColumnSchema::new(
7937 "v",
7938 DataType::Vector {
7939 dim: 4,
7940 encoding: VecEncoding::F32,
7941 },
7942 false,
7943 ),
7944 ],
7945 ))
7946 .unwrap();
7947 let t = cat.get_mut("vecs").unwrap();
7948 t.insert(Row::new(vec![
7949 Value::Int(1),
7950 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
7951 ]))
7952 .unwrap();
7953 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
7954 let err = t
7955 .register_cold_locators(
7956 "by_v",
7957 vec![(
7958 IndexKey::Int(1),
7959 RowLocator::Cold {
7960 segment_id: 0,
7961 page_offset: 0,
7962 },
7963 )],
7964 )
7965 .unwrap_err();
7966 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
7969 }
7970
7971 #[test]
7972 fn load_segment_bytes_rejects_garbage() {
7973 let mut cat = Catalog::new();
7974 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
7975 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
7976 assert_eq!(cat.cold_segment_count(), 0);
7978 }
7979
7980 #[test]
7981 fn load_segment_bytes_returns_sequential_ids() {
7982 let mut cat = Catalog::new();
7983 cat.create_table(bigint_pk_users_schema()).unwrap();
7984 let schema = cat.get("users").unwrap().schema.clone();
7985 for batch in 0u32..3 {
7986 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
7987 .map(|i| {
7988 let id = u64::from(batch) * 100 + i;
7989 let row = make_user_row(id.cast_signed(), "x");
7990 (id, encode_row_body_dense(&row, &schema))
7991 })
7992 .collect();
7993 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7994 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
7995 }
7996 assert_eq!(cat.cold_segment_count(), 3);
7997 }
7998
7999 #[test]
8006 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
8007 let mut cat = populated_users();
8014 cat.get_mut("users")
8015 .unwrap()
8016 .add_index("by_name".into(), "name")
8017 .unwrap();
8018
8019 let v8_bytes = encode_as_v8(&cat);
8024 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
8025
8026 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
8027 let idx = restored
8028 .get("users")
8029 .unwrap()
8030 .index_on(1)
8031 .expect("index_on(1) after restore");
8032 assert_eq!(
8035 idx.lookup_eq(&IndexKey::Text("alice".into())),
8036 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8037 );
8038 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
8040 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
8041 }
8042 }
8043
8044 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
8049 let mut out = Vec::with_capacity(64);
8050 out.extend_from_slice(FILE_MAGIC);
8051 out.push(8u8);
8052 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
8053 for t in &cat.tables {
8054 write_str(&mut out, &t.schema.name);
8055 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
8056 for c in &t.schema.columns {
8057 write_str(&mut out, &c.name);
8058 write_data_type(&mut out, c.ty);
8059 out.push(u8::from(c.nullable));
8060 match &c.default {
8061 None => out.push(0),
8062 Some(v) => {
8063 out.push(1);
8064 write_value(&mut out, v);
8065 }
8066 }
8067 out.push(u8::from(c.auto_increment));
8068 }
8069 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
8070 for row in &t.rows {
8071 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8072 }
8073 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8074 for idx in &t.indices {
8075 write_str(&mut out, &idx.name);
8076 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8077 match &idx.kind {
8078 IndexKind::BTree(_) => out.push(0),
8081 IndexKind::Nsw(g) => {
8082 out.push(1);
8083 write_u16(&mut out, u16::try_from(g.m).unwrap());
8084 write_nsw_graph(&mut out, g);
8085 }
8086 IndexKind::Brin { .. } => panic!(
8089 "v8 catalog writer cannot serialise BRIN — \
8090 tests with BRIN indices must use the current writer"
8091 ),
8092 IndexKind::Gin(_) => panic!(
8093 "v8 catalog writer cannot serialise GIN — \
8094 tests with GIN indices must use the current writer"
8095 ),
8096 }
8097 }
8098 }
8099 out
8100 }
8101
8102 #[test]
8108 fn v9_catalog_round_trip_preserves_cold_locators() {
8109 let mut cat = Catalog::new();
8110 cat.create_table(bigint_pk_users_schema()).unwrap();
8111 let t = cat.get_mut("users").unwrap();
8112 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8114 t.insert(make_user_row(id, name)).unwrap();
8115 }
8116 t.add_index("by_id".into(), "id").unwrap();
8117 let schema = t.schema.clone();
8118
8119 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8121 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8122 .iter()
8123 .map(|(id, name)| {
8124 let row = make_user_row(*id, name);
8125 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8126 })
8127 .collect();
8128 let (seg_bytes, _) =
8129 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8130 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8131 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8132 .iter()
8133 .map(|(id, _)| {
8134 (
8135 IndexKey::Int(*id),
8136 RowLocator::Cold {
8137 segment_id: seg_id,
8138 page_offset: 0,
8139 },
8140 )
8141 })
8142 .collect();
8143 cat.get_mut("users")
8144 .unwrap()
8145 .register_cold_locators("by_id", pairs)
8146 .unwrap();
8147
8148 let bytes = cat.serialize();
8150 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8151 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8152
8153 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8160 assert_eq!(restored_seg_id, seg_id);
8161
8162 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8163 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8165 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8166 for (id, _) in &cold_rows {
8168 assert_eq!(
8169 idx.lookup_eq(&IndexKey::Int(*id)),
8170 &[RowLocator::Cold {
8171 segment_id: seg_id,
8172 page_offset: 0,
8173 }]
8174 );
8175 }
8176 assert_eq!(
8178 restored
8179 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8180 .unwrap(),
8181 make_user_row(2, "bob")
8182 );
8183 for (id, name) in &cold_rows {
8184 assert_eq!(
8185 restored
8186 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8187 .unwrap(),
8188 make_user_row(*id, name)
8189 );
8190 }
8191 }
8192
8193 #[test]
8200 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8201 let schema = TableSchema::new(
8202 "wide",
8203 vec![
8204 ColumnSchema::new("a", DataType::SmallInt, true),
8205 ColumnSchema::new("b", DataType::Int, false),
8206 ColumnSchema::new("c", DataType::BigInt, false),
8207 ColumnSchema::new("d", DataType::Float, false),
8208 ColumnSchema::new("e", DataType::Bool, false),
8209 ColumnSchema::new("f", DataType::Text, false),
8210 ColumnSchema::new(
8211 "g",
8212 DataType::Vector {
8213 dim: 3,
8214 encoding: VecEncoding::F32,
8215 },
8216 false,
8217 ),
8218 ColumnSchema::new(
8219 "h",
8220 DataType::Numeric {
8221 precision: 18,
8222 scale: 2,
8223 },
8224 false,
8225 ),
8226 ColumnSchema::new("i", DataType::Date, false),
8227 ColumnSchema::new("j", DataType::Timestamp, false),
8228 ],
8229 );
8230 let cases: &[Row] = &[
8231 Row::new(vec![
8232 Value::SmallInt(7),
8233 Value::Int(42),
8234 Value::BigInt(1_000_000),
8235 Value::Float(1.5),
8236 Value::Bool(true),
8237 Value::Text("hello".into()),
8238 Value::Vector(vec![1.0, 2.0, 3.0]),
8239 Value::Numeric {
8240 scaled: 12345,
8241 scale: 2,
8242 },
8243 Value::Date(20_000),
8244 Value::Timestamp(1_700_000_000_000_000),
8245 ]),
8246 Row::new(vec![
8248 Value::Null,
8249 Value::Int(0),
8250 Value::BigInt(0),
8251 Value::Float(0.0),
8252 Value::Bool(false),
8253 Value::Text(String::new()),
8254 Value::Vector(vec![]),
8255 Value::Numeric {
8256 scaled: 0,
8257 scale: 2,
8258 },
8259 Value::Date(0),
8260 Value::Timestamp(0),
8261 ]),
8262 Row::new(vec![
8263 Value::SmallInt(-1),
8264 Value::Int(-1),
8265 Value::BigInt(-1),
8266 Value::Float(-0.5),
8267 Value::Bool(true),
8268 Value::Text("a much longer payload here".into()),
8269 Value::Vector(vec![0.1, 0.2, 0.3]),
8270 Value::Numeric {
8271 scaled: -999_999_999,
8272 scale: 2,
8273 },
8274 Value::Date(-1),
8275 Value::Timestamp(-1),
8276 ]),
8277 ];
8278 for row in cases {
8279 let actual = encode_row_body_dense(row, &schema).len();
8280 let fast = row_body_encoded_len(row, &schema);
8281 assert_eq!(actual, fast, "row {row:?}");
8282 }
8283 }
8284
8285 #[test]
8286 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8287 let mut cat = Catalog::new();
8288 cat.create_table(bigint_pk_users_schema()).unwrap();
8289 let t = cat.get_mut("users").unwrap();
8290 assert_eq!(t.hot_bytes(), 0);
8291 let mut expected: u64 = 0;
8292 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8293 let row = make_user_row(id, name);
8294 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8295 t.insert(row).unwrap();
8296 }
8297 assert_eq!(t.hot_bytes(), expected);
8298 assert_eq!(cat.hot_tier_bytes(), expected);
8299 }
8300
8301 #[test]
8302 fn hot_bytes_shrinks_on_delete() {
8303 let mut cat = Catalog::new();
8304 cat.create_table(bigint_pk_users_schema()).unwrap();
8305 let t = cat.get_mut("users").unwrap();
8306 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8307 t.insert(make_user_row(id, name)).unwrap();
8308 }
8309 let before = t.hot_bytes();
8310 let bob_row = make_user_row(2, "bob");
8312 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8313 let removed = t.delete_rows(&[1]);
8314 assert_eq!(removed, 1);
8315 assert_eq!(t.hot_bytes(), before - bob_bytes);
8316 }
8317
8318 #[test]
8319 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8320 let mut cat = Catalog::new();
8321 cat.create_table(bigint_pk_users_schema()).unwrap();
8322 let t = cat.get_mut("users").unwrap();
8323 t.insert(make_user_row(1, "alice")).unwrap();
8324 let after_insert = t.hot_bytes();
8325 let new_row = make_user_row(1, "alice-the-longer-name");
8328 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8329 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8330 t.update_row(0, new_row.values).unwrap();
8331 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8332 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8333 }
8334
8335 #[test]
8336 fn hot_bytes_round_trips_through_serialize_deserialize() {
8337 let mut cat = Catalog::new();
8338 cat.create_table(bigint_pk_users_schema()).unwrap();
8339 let t = cat.get_mut("users").unwrap();
8340 for i in 0..10 {
8341 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8342 .unwrap();
8343 }
8344 let pre = cat.hot_tier_bytes();
8345 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8346 assert_eq!(restored.hot_tier_bytes(), pre);
8347 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8348 }
8349
8350 #[test]
8357 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8358 let mut cat = Catalog::new();
8359 cat.create_table(bigint_pk_users_schema()).unwrap();
8360 let t = cat.get_mut("users").unwrap();
8361 for id in 0..10i64 {
8362 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8363 .unwrap();
8364 }
8365 t.add_index("by_id".into(), "id").unwrap();
8366 let total_bytes_before = t.hot_bytes();
8367
8368 let report = cat
8369 .freeze_oldest_to_cold("users", "by_id", 6)
8370 .expect("freeze succeeds");
8371 assert_eq!(report.frozen_rows, 6);
8372 assert_eq!(report.segment_id, 0);
8373 assert!(report.bytes_freed > 0);
8374 assert!(!report.segment_bytes.is_empty());
8375
8376 let t = cat.get("users").unwrap();
8377 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8378 assert_eq!(cat.cold_segment_count(), 1);
8379 assert_eq!(
8381 t.hot_bytes(),
8382 total_bytes_before - report.bytes_freed,
8383 "hot_bytes accounting matches FreezeReport"
8384 );
8385
8386 for id in 0..10i64 {
8389 let got = cat
8390 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8391 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8392 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8393 }
8394 }
8395
8396 #[test]
8401 fn freeze_twice_preserves_prior_cold_locators() {
8402 let mut cat = Catalog::new();
8403 cat.create_table(bigint_pk_users_schema()).unwrap();
8404 let t = cat.get_mut("users").unwrap();
8405 for id in 0..12i64 {
8406 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8407 .unwrap();
8408 }
8409 t.add_index("by_id".into(), "id").unwrap();
8410
8411 cat.freeze_oldest_to_cold("users", "by_id", 4)
8412 .expect("first freeze ok");
8413 cat.freeze_oldest_to_cold("users", "by_id", 4)
8414 .expect("second freeze ok");
8415
8416 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8417 assert_eq!(cat.cold_segment_count(), 2);
8418 for id in 0..12i64 {
8421 let got = cat
8422 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8423 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8424 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8425 }
8426 }
8427
8428 #[test]
8431 fn freeze_oldest_to_cold_rejects_invalid_input() {
8432 let mut cat = Catalog::new();
8433 cat.create_table(bigint_pk_users_schema()).unwrap();
8434 let t = cat.get_mut("users").unwrap();
8435 for id in 0..3i64 {
8436 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8437 .unwrap();
8438 }
8439 t.add_index("by_id".into(), "id").unwrap();
8440
8441 assert!(matches!(
8443 cat.freeze_oldest_to_cold("users", "by_id", 0),
8444 Err(StorageError::Corrupt(_))
8445 ));
8446 assert!(matches!(
8448 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8449 Err(StorageError::Corrupt(_))
8450 ));
8451 assert!(matches!(
8453 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8454 Err(StorageError::Corrupt(_))
8455 ));
8456 assert!(matches!(
8458 cat.freeze_oldest_to_cold("users", "by_id", 999),
8459 Err(StorageError::Corrupt(_))
8460 ));
8461 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8463 assert_eq!(cat.cold_segment_count(), 0);
8464 }
8465
8466 #[test]
8469 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8470 let mut cat = Catalog::new();
8471 cat.create_table(TableSchema::new(
8472 "by_name",
8473 vec![
8474 ColumnSchema::new("name", DataType::Text, false),
8475 ColumnSchema::new("payload", DataType::BigInt, false),
8476 ],
8477 ))
8478 .unwrap();
8479 let t = cat.get_mut("by_name").unwrap();
8480 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8481 .unwrap();
8482 t.add_index("by_n".into(), "name").unwrap();
8483 let err = cat
8484 .freeze_oldest_to_cold("by_name", "by_n", 1)
8485 .expect_err("non-integer PK rejected");
8486 match err {
8487 StorageError::Corrupt(s) => assert!(
8488 s.contains("non-integer"),
8489 "error message names the constraint: {s}"
8490 ),
8491 other => panic!("expected Corrupt, got {other:?}"),
8492 }
8493 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8495 assert_eq!(cat.cold_segment_count(), 0);
8496 }
8497
8498 #[test]
8503 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8504 let mut cat = Catalog::new();
8505 cat.create_table(bigint_pk_users_schema()).unwrap();
8506 let t = cat.get_mut("users").unwrap();
8507 for id in 0..6i64 {
8508 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8509 .unwrap();
8510 }
8511 t.add_index("by_id".into(), "id").unwrap();
8512 t.add_index("by_name".into(), "name").unwrap();
8513
8514 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8515
8516 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8520 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8521 assert_eq!(got.len(), 1);
8522 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8523 match got[0] {
8524 RowLocator::Hot(i) => {
8525 assert_eq!(i, 1);
8528 }
8529 RowLocator::Cold { .. } => unreachable!(),
8530 }
8531 }
8532
8533 #[test]
8541 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8542 let mut cat = Catalog::new();
8543 cat.create_table(bigint_pk_users_schema()).unwrap();
8544 let t = cat.get_mut("users").unwrap();
8545 for id in 0..6i64 {
8546 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8547 .unwrap();
8548 }
8549 t.add_index("by_id".into(), "id").unwrap();
8550 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8553 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8554
8555 let new_idx = cat
8557 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8558 .expect("promote ok")
8559 .expect("PK 2 was cold");
8560 assert_eq!(
8561 new_idx, 2,
8562 "promoted row appended after the 2 surviving hot rows"
8563 );
8564
8565 let t = cat.get("users").unwrap();
8566 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8567 let row = make_user_row(2, "u-2");
8569 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8570 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8571
8572 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8575 assert_eq!(entries.len(), 1, "exactly one locator per key");
8576 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8577 assert_eq!(
8579 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8580 .unwrap(),
8581 row
8582 );
8583 assert_eq!(
8586 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8587 .unwrap(),
8588 make_user_row(0, "u-0")
8589 );
8590 }
8591
8592 #[test]
8596 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8597 let mut cat = Catalog::new();
8598 cat.create_table(bigint_pk_users_schema()).unwrap();
8599 let t = cat.get_mut("users").unwrap();
8600 t.insert(make_user_row(7, "alice")).unwrap();
8601 t.add_index("by_id".into(), "id").unwrap();
8602
8603 assert!(
8605 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8606 .unwrap()
8607 .is_none()
8608 );
8609 assert!(
8611 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8612 .unwrap()
8613 .is_none()
8614 );
8615 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8617 assert_eq!(cat.cold_segment_count(), 0);
8618 }
8619
8620 #[test]
8625 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8626 let mut cat = Catalog::new();
8627 cat.create_table(bigint_pk_users_schema()).unwrap();
8628 let t = cat.get_mut("users").unwrap();
8629 for id in 0..5i64 {
8630 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8631 .unwrap();
8632 }
8633 t.add_index("by_id".into(), "id").unwrap();
8634 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8635
8636 assert!(
8638 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8639 .is_some(),
8640 "frozen PK resolves before shadow"
8641 );
8642 let removed = cat
8643 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8644 .unwrap();
8645 assert_eq!(removed, 1, "exactly one cold locator retired");
8646
8647 assert!(
8650 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8651 .is_none(),
8652 "shadowed key no longer resolves"
8653 );
8654 assert_eq!(
8656 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8657 .unwrap(),
8658 make_user_row(0, "u-0")
8659 );
8660 assert_eq!(
8661 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8662 .unwrap(),
8663 make_user_row(2, "u-2")
8664 );
8665 }
8666
8667 #[test]
8672 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8673 let mut cat = Catalog::new();
8674 cat.create_table(bigint_pk_users_schema()).unwrap();
8675 let t = cat.get_mut("users").unwrap();
8676 t.insert(make_user_row(1, "alice")).unwrap();
8677 t.add_index("by_id".into(), "id").unwrap();
8678 assert_eq!(
8679 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8680 .unwrap(),
8681 0,
8682 "hot-only key drops no cold locators"
8683 );
8684 assert_eq!(
8685 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
8686 .unwrap(),
8687 0,
8688 "absent key drops no cold locators"
8689 );
8690 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8691 }
8692
8693 #[test]
8695 fn promote_and_shadow_reject_invalid_inputs() {
8696 let mut cat = Catalog::new();
8697 cat.create_table(bigint_pk_users_schema()).unwrap();
8698 let t = cat.get_mut("users").unwrap();
8699 t.insert(make_user_row(1, "alice")).unwrap();
8700 t.add_index("by_id".into(), "id").unwrap();
8701
8702 assert!(matches!(
8704 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
8705 Err(StorageError::Corrupt(_))
8706 ));
8707 assert!(matches!(
8708 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
8709 Err(StorageError::Corrupt(_))
8710 ));
8711 assert!(matches!(
8713 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8714 Err(StorageError::Corrupt(_))
8715 ));
8716 assert!(matches!(
8717 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8718 Err(StorageError::Corrupt(_))
8719 ));
8720 }
8721
8722 #[test]
8729 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
8730 let mut a = Catalog::new();
8731 let mut b = Catalog::new();
8732 for cat in [&mut a, &mut b] {
8733 cat.create_table(bigint_pk_users_schema()).unwrap();
8734 let t = cat.get_mut("users").unwrap();
8735 for id in 0..10i64 {
8736 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8737 .unwrap();
8738 }
8739 t.add_index("by_id".into(), "id").unwrap();
8740 }
8741 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
8742 let slice = b
8743 .prepare_freeze_slice("users", "by_id", 0..6)
8744 .expect("prepare");
8745 let parallel = b
8746 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
8747 .expect("commit");
8748 assert_eq!(single.segment_id, parallel.segment_id);
8749 assert_eq!(single.frozen_rows, parallel.frozen_rows);
8750 assert_eq!(single.bytes_freed, parallel.bytes_freed);
8751 assert_eq!(single.segment_bytes, parallel.segment_bytes);
8752 for id in 0..10i64 {
8754 assert_eq!(
8755 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8756 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8757 "PK {id} differs after single vs slice freeze"
8758 );
8759 }
8760 }
8761
8762 #[test]
8767 fn commit_freeze_slices_two_slices_match_single_slice() {
8768 let mut a = Catalog::new();
8769 let mut b = Catalog::new();
8770 for cat in [&mut a, &mut b] {
8771 cat.create_table(bigint_pk_users_schema()).unwrap();
8772 let t = cat.get_mut("users").unwrap();
8773 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
8776 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
8777 .unwrap();
8778 }
8779 t.add_index("by_id".into(), "id").unwrap();
8780 }
8781 let single = a
8782 .prepare_freeze_slice("users", "by_id", 0..8)
8783 .expect("prepare");
8784 let one = a
8785 .commit_freeze_slices("users", "by_id", alloc::vec![single])
8786 .expect("commit one");
8787 let s1 = b
8788 .prepare_freeze_slice("users", "by_id", 0..4)
8789 .expect("prepare s1");
8790 let s2 = b
8791 .prepare_freeze_slice("users", "by_id", 4..8)
8792 .expect("prepare s2");
8793 let two = b
8794 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
8795 .expect("commit two");
8796 assert_eq!(one.segment_bytes, two.segment_bytes);
8797 assert_eq!(one.frozen_rows, two.frozen_rows);
8798 for id in 0..10i64 {
8801 assert_eq!(
8802 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8803 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8804 "PK {id} differs after one-slice vs two-slice freeze"
8805 );
8806 }
8807 }
8808
8809 #[test]
8811 fn commit_freeze_slices_rejects_gap() {
8812 let mut cat = Catalog::new();
8813 cat.create_table(bigint_pk_users_schema()).unwrap();
8814 let t = cat.get_mut("users").unwrap();
8815 for id in 0..6i64 {
8816 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8817 .unwrap();
8818 }
8819 t.add_index("by_id".into(), "id").unwrap();
8820 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
8821 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
8822 assert!(matches!(
8823 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
8824 Err(StorageError::Corrupt(_))
8825 ));
8826 assert_eq!(cat.cold_segment_count(), 0);
8828 assert_eq!(cat.get("users").unwrap().row_count(), 6);
8829 }
8830
8831 #[test]
8833 fn commit_freeze_slices_empty_is_noop() {
8834 let mut cat = Catalog::new();
8835 cat.create_table(bigint_pk_users_schema()).unwrap();
8836 let t = cat.get_mut("users").unwrap();
8837 for id in 0..3i64 {
8838 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8839 .unwrap();
8840 }
8841 t.add_index("by_id".into(), "id").unwrap();
8842 let report = cat
8843 .commit_freeze_slices("users", "by_id", Vec::new())
8844 .unwrap();
8845 assert_eq!(report.frozen_rows, 0);
8846 assert_eq!(cat.cold_segment_count(), 0);
8847 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8848 }
8849
8850 #[test]
8857 fn compact_merges_small_segments_storage_unit() {
8858 let mut cat = Catalog::new();
8859 cat.create_table(bigint_pk_users_schema()).unwrap();
8860 let t = cat.get_mut("users").unwrap();
8861 for id in 0..8i64 {
8862 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8863 .unwrap();
8864 }
8865 t.add_index("by_id".into(), "id").unwrap();
8866 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8868 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8869 assert_eq!(cat.cold_segment_count(), 2);
8870 assert_eq!(cat.cold_segment_slot_count(), 2);
8871
8872 let max_seg_bytes = cat
8875 .cold_segment_ids_global()
8876 .iter()
8877 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8878 .max()
8879 .unwrap();
8880 let target = max_seg_bytes + 1;
8881
8882 let report = cat
8883 .compact_cold_segments("users", "by_id", target)
8884 .expect("compact succeeds");
8885 assert_eq!(report.sources.len(), 2);
8886 let merged_id = report.merged_segment_id.expect("merge happened");
8887 assert_eq!(report.merged_rows, 6);
8888 assert_eq!(report.deleted_rows_pruned, 0);
8889 assert!(!report.merged_segment_bytes.is_empty());
8890
8891 assert_eq!(cat.cold_segment_count(), 1);
8894 assert_eq!(cat.cold_segment_slot_count(), 3);
8895 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
8896
8897 for id in 0..8i64 {
8900 let got = cat
8901 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8902 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
8903 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8904 }
8905 }
8906
8907 #[test]
8911 fn compact_drops_shadowed_cold_rows() {
8912 let mut cat = Catalog::new();
8913 cat.create_table(bigint_pk_users_schema()).unwrap();
8914 let t = cat.get_mut("users").unwrap();
8915 for id in 0..6i64 {
8916 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8917 .unwrap();
8918 }
8919 t.add_index("by_id".into(), "id").unwrap();
8920 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8921 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8922 assert_eq!(
8924 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8925 .unwrap(),
8926 1
8927 );
8928 assert_eq!(
8929 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
8930 .unwrap(),
8931 1
8932 );
8933
8934 let max_seg_bytes = cat
8935 .cold_segment_ids_global()
8936 .iter()
8937 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8938 .max()
8939 .unwrap();
8940 let report = cat
8941 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8942 .expect("compact succeeds");
8943 assert_eq!(report.sources.len(), 2);
8944 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
8945 assert_eq!(report.deleted_rows_pruned, 2);
8946
8947 for shadowed in [1i64, 4i64] {
8949 assert!(
8950 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
8951 .is_none(),
8952 "shadowed PK {shadowed} must remain invisible after compact"
8953 );
8954 }
8955 for live in [0i64, 2, 3, 5] {
8957 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
8958 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
8959 }
8960 }
8961
8962 #[test]
8965 fn compact_is_noop_below_two_candidates() {
8966 let mut cat = Catalog::new();
8967 cat.create_table(bigint_pk_users_schema()).unwrap();
8968 let t = cat.get_mut("users").unwrap();
8969 for id in 0..6i64 {
8970 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8971 .unwrap();
8972 }
8973 t.add_index("by_id".into(), "id").unwrap();
8974 let report = cat
8976 .compact_cold_segments("users", "by_id", 1 << 30)
8977 .expect("noop ok");
8978 assert!(report.merged_segment_id.is_none());
8979 assert!(report.sources.is_empty());
8980
8981 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8983 let report = cat
8984 .compact_cold_segments("users", "by_id", 1 << 30)
8985 .expect("noop ok");
8986 assert!(report.merged_segment_id.is_none());
8987 assert_eq!(cat.cold_segment_count(), 1);
8988
8989 let report = cat
8992 .compact_cold_segments("users", "by_id", 1)
8993 .expect("noop ok");
8994 assert!(report.merged_segment_id.is_none());
8995 assert_eq!(cat.cold_segment_count(), 1);
8996 }
8997
8998 #[test]
9006 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
9007 let mut cat = Catalog::new();
9008 cat.create_table(bigint_pk_users_schema()).unwrap();
9009 let t = cat.get_mut("users").unwrap();
9010 for id in 0..6i64 {
9011 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9012 .unwrap();
9013 }
9014 t.add_index("by_id".into(), "id").unwrap();
9015 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9016 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9017 let max_seg_bytes = cat
9018 .cold_segment_ids_global()
9019 .iter()
9020 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9021 .max()
9022 .unwrap();
9023 let report = cat
9024 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9025 .expect("compact ok");
9026 let merged_id = report.merged_segment_id.unwrap();
9027
9028 let cat_bytes = cat.serialize();
9033 let merged_bytes = report.merged_segment_bytes.clone();
9034
9035 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
9036 restored
9037 .load_segment_bytes_at(merged_id, merged_bytes)
9038 .expect("reload merged ok");
9039
9040 for id in 0..6i64 {
9042 let got = restored
9043 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9044 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
9045 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9046 }
9047 assert_eq!(restored.cold_segment_count(), 1);
9050 }
9051
9052 #[test]
9055 fn load_segment_bytes_at_pads_and_rejects_collision() {
9056 let mut cat = Catalog::new();
9057 cat.create_table(bigint_pk_users_schema()).unwrap();
9058 let t = cat.get_mut("users").unwrap();
9059 for id in 0..4i64 {
9060 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9061 .unwrap();
9062 }
9063 t.add_index("by_id".into(), "id").unwrap();
9064 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9065 let bytes_seg0 = report.segment_bytes.clone();
9066
9067 cat.load_segment_bytes_at(5, bytes_seg0.clone())
9071 .expect("pad + load ok");
9072 assert_eq!(cat.cold_segment_slot_count(), 6);
9073 assert_eq!(cat.cold_segment_count(), 2);
9074
9075 assert!(matches!(
9077 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9078 Err(StorageError::Corrupt(_))
9079 ));
9080 assert!(matches!(
9082 cat.load_segment_bytes_at(0, bytes_seg0),
9083 Err(StorageError::Corrupt(_))
9084 ));
9085 }
9086
9087 #[test]
9091 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9092 let mut cat = Catalog::new();
9093 cat.create_table(bigint_pk_users_schema()).unwrap();
9094 let t = cat.get_mut("users").unwrap();
9095 for id in 0..4i64 {
9096 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9097 .unwrap();
9098 }
9099 t.add_index("by_id".into(), "id").unwrap();
9100
9101 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9103 let promoted = cat
9104 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9105 .unwrap();
9106 assert!(promoted.is_some());
9107 let entries_after_promote = cat
9108 .get("users")
9109 .unwrap()
9110 .index_on(0)
9111 .unwrap()
9112 .lookup_eq(&IndexKey::Int(0))
9113 .to_vec();
9114 assert_eq!(entries_after_promote.len(), 1);
9115 assert!(entries_after_promote[0].is_hot());
9116
9117 for id in [2i64, 3] {
9124 assert_eq!(
9125 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9126 .unwrap(),
9127 make_user_row(id, &alloc::format!("u-{id}"))
9128 );
9129 }
9130 }
9131}