1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::format;
34use alloc::string::String;
35use alloc::sync::Arc;
36use alloc::vec::Vec;
37use core::fmt;
38
39use self::persistent::PersistentVec;
40use self::persistent_btree::PersistentBTreeMap;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
54pub enum VecEncoding {
55 #[default]
56 F32,
57 Sq8,
58 F16,
59}
60
61impl fmt::Display for VecEncoding {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Self::F32 => f.write_str("F32"),
65 Self::Sq8 => f.write_str("SQ8"),
66 Self::F16 => f.write_str("HALF"),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DataType {
76 SmallInt,
79 Int, BigInt, Float, Text,
83 Varchar(u32),
86 Char(u32),
90 Bool,
91 Vector {
97 dim: u32,
98 encoding: VecEncoding,
99 },
100 Numeric {
106 precision: u8,
107 scale: u8,
108 },
109 Date,
112 Timestamp,
115 Timestamptz,
123 Interval,
128 Json,
133 Jsonb,
139 Bytes,
147 TextArray,
156 IntArray,
160 BigIntArray,
163 TsVector,
171 TsQuery,
175}
176
177impl fmt::Display for DataType {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::SmallInt => f.write_str("SMALLINT"),
181 Self::Int => f.write_str("INT"),
182 Self::BigInt => f.write_str("BIGINT"),
183 Self::Float => f.write_str("FLOAT"),
184 Self::Text => f.write_str("TEXT"),
185 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
186 Self::Char(n) => write!(f, "CHAR({n})"),
187 Self::Bool => f.write_str("BOOL"),
188 Self::Vector { dim, encoding } => match encoding {
189 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
190 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
191 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
192 },
193 Self::Numeric { precision, scale } => {
194 if *scale == 0 {
195 write!(f, "NUMERIC({precision})")
196 } else {
197 write!(f, "NUMERIC({precision}, {scale})")
198 }
199 }
200 Self::Date => f.write_str("DATE"),
201 Self::Timestamp => f.write_str("TIMESTAMP"),
202 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
203 Self::Interval => f.write_str("INTERVAL"),
204 Self::Json => f.write_str("JSON"),
205 Self::Jsonb => f.write_str("JSONB"),
206 Self::Bytes => f.write_str("BYTEA"),
207 Self::TextArray => f.write_str("TEXT[]"),
208 Self::IntArray => f.write_str("INT[]"),
209 Self::BigIntArray => f.write_str("BIGINT[]"),
210 Self::TsVector => f.write_str("TSVECTOR"),
211 Self::TsQuery => f.write_str("TSQUERY"),
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct TsLexeme {
223 pub word: String,
224 pub positions: Vec<u16>,
225 pub weight: u8,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TsQueryAst {
233 Term {
237 word: String,
238 weight_mask: u8,
239 },
240 And(Box<TsQueryAst>, Box<TsQueryAst>),
241 Or(Box<TsQueryAst>, Box<TsQueryAst>),
242 Not(Box<TsQueryAst>),
243 Phrase {
246 left: Box<TsQueryAst>,
247 right: Box<TsQueryAst>,
248 distance: u16,
249 },
250}
251
252#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub enum Value {
258 SmallInt(i16),
259 Int(i32),
260 BigInt(i64),
261 Float(f64),
262 Text(String),
263 Bool(bool),
264 Vector(Vec<f32>),
265 Sq8Vector(crate::quantize::Sq8Vector),
272 HalfVector(crate::halfvec::HalfVector),
278 Numeric {
282 scaled: i128,
283 scale: u8,
284 },
285 Date(i32),
287 Timestamp(i64),
289 Interval {
292 months: i32,
293 micros: i64,
294 },
295 Json(String),
299 Bytes(Vec<u8>),
305 TextArray(Vec<Option<String>>),
311 IntArray(Vec<Option<i32>>),
315 BigIntArray(Vec<Option<i64>>),
318 TsVector(Vec<TsLexeme>),
323 TsQuery(TsQueryAst),
326 Null,
327}
328
329impl Value {
330 pub fn data_type(&self) -> Option<DataType> {
332 match self {
333 Self::SmallInt(_) => Some(DataType::SmallInt),
334 Self::Int(_) => Some(DataType::Int),
335 Self::BigInt(_) => Some(DataType::BigInt),
336 Self::Float(_) => Some(DataType::Float),
337 Self::Text(_) => Some(DataType::Text),
340 Self::Bool(_) => Some(DataType::Bool),
341 Self::Vector(v) => Some(DataType::Vector {
342 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
343 encoding: VecEncoding::F32,
344 }),
345 Self::Sq8Vector(q) => Some(DataType::Vector {
346 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
347 encoding: VecEncoding::Sq8,
348 }),
349 Self::HalfVector(h) => Some(DataType::Vector {
350 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
351 encoding: VecEncoding::F16,
352 }),
353 Self::Numeric { scale, .. } => Some(DataType::Numeric {
358 precision: 0,
359 scale: *scale,
360 }),
361 Self::Date(_) => Some(DataType::Date),
362 Self::Timestamp(_) => Some(DataType::Timestamp),
363 Self::Interval { .. } => Some(DataType::Interval),
364 Self::Json(_) => Some(DataType::Json),
365 Self::Bytes(_) => Some(DataType::Bytes),
366 Self::TextArray(_) => Some(DataType::TextArray),
367 Self::IntArray(_) => Some(DataType::IntArray),
368 Self::BigIntArray(_) => Some(DataType::BigIntArray),
369 Self::TsVector(_) => Some(DataType::TsVector),
370 Self::TsQuery(_) => Some(DataType::TsQuery),
371 Self::Null => None,
372 }
373 }
374
375 pub const fn is_null(&self) -> bool {
376 matches!(self, Self::Null)
377 }
378}
379
380#[derive(Debug, Clone, PartialEq)]
383pub struct Row {
384 pub values: Vec<Value>,
385}
386
387impl Row {
388 pub const fn new(values: Vec<Value>) -> Self {
389 Self { values }
390 }
391
392 pub fn len(&self) -> usize {
393 self.values.len()
394 }
395
396 pub fn is_empty(&self) -> bool {
397 self.values.is_empty()
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
402pub struct ColumnSchema {
403 pub name: String,
404 pub ty: DataType,
405 pub nullable: bool,
406 pub default: Option<Value>,
411 pub runtime_default: Option<String>,
419 pub auto_increment: bool,
423}
424
425#[derive(Debug, Clone, PartialEq)]
426pub struct TableSchema {
427 pub name: String,
428 pub columns: Vec<ColumnSchema>,
429 pub hot_tier_bytes: Option<u64>,
435 pub foreign_keys: Vec<ForeignKeyConstraint>,
442 pub uniqueness_constraints: Vec<UniquenessConstraint>,
449 pub checks: Vec<String>,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct UniquenessConstraint {
466 pub is_primary_key: bool,
471 pub columns: Vec<usize>,
475 pub nulls_not_distinct: bool,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct ForeignKeyConstraint {
490 pub name: Option<String>,
494 pub local_columns: Vec<usize>,
497 pub parent_table: String,
499 pub parent_columns: Vec<usize>,
504 pub on_delete: FkAction,
506 pub on_update: FkAction,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
513pub enum FkAction {
514 Restrict,
515 Cascade,
516 SetNull,
517 SetDefault,
518 NoAction,
519}
520
521impl FkAction {
522 pub const fn tag(self) -> u8 {
524 match self {
525 Self::Restrict => 0,
526 Self::Cascade => 1,
527 Self::SetNull => 2,
528 Self::SetDefault => 3,
529 Self::NoAction => 4,
530 }
531 }
532 pub const fn from_tag(b: u8) -> Option<Self> {
533 Some(match b {
534 0 => Self::Restrict,
535 1 => Self::Cascade,
536 2 => Self::SetNull,
537 3 => Self::SetDefault,
538 4 => Self::NoAction,
539 _ => return None,
540 })
541 }
542}
543
544impl TableSchema {
545 pub fn column_position(&self, name: &str) -> Option<usize> {
546 self.columns.iter().position(|c| c.name == name)
547 }
548}
549
550#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
555pub enum IndexKey {
556 Int(i64),
557 Text(String),
558 Bool(bool),
559}
560
561impl IndexKey {
562 pub fn from_value(v: &Value) -> Option<Self> {
563 match v {
564 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
565 Value::Int(n) => Some(Self::Int(i64::from(*n))),
566 Value::BigInt(n) => Some(Self::Int(*n)),
567 Value::Text(s) => Some(Self::Text(s.clone())),
568 Value::Bool(b) => Some(Self::Bool(*b)),
569 Value::Date(d) => Some(Self::Int(i64::from(*d))),
572 Value::Timestamp(t) => Some(Self::Int(*t)),
573 Value::Null
578 | Value::Float(_)
579 | Value::Vector(_)
580 | Value::Sq8Vector(_)
581 | Value::HalfVector(_)
582 | Value::Numeric { .. }
583 | Value::Interval { .. }
584 | Value::Json(_)
585 | Value::Bytes(_)
586 | Value::TextArray(_)
587 | Value::IntArray(_)
588 | Value::BigIntArray(_)
589 | Value::TsVector(_)
590 | Value::TsQuery(_) => None,
591 }
592 }
593}
594
595#[derive(Debug, Clone)]
600pub struct Index {
601 pub name: String,
602 pub column_position: usize,
603 pub kind: IndexKind,
604 pub included_columns: Vec<usize>,
614 pub partial_predicate: Option<String>,
621 pub expression: Option<String>,
626 pub is_unique: bool,
633 pub extra_column_positions: Vec<usize>,
642}
643
644pub const NSW_DEFAULT_M: usize = 16;
647
648#[derive(Debug, Clone)]
656pub struct FreezeReport {
657 pub segment_id: u32,
660 pub frozen_rows: usize,
663 pub bytes_freed: u64,
667 pub segment_bytes: Vec<u8>,
672}
673
674#[derive(Debug, Clone)]
683pub struct FreezeSlice {
684 pub row_range: core::ops::Range<usize>,
689 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
695}
696
697#[derive(Debug, Clone)]
713pub struct CompactReport {
714 pub sources: Vec<u32>,
716 pub merged_segment_id: Option<u32>,
718 pub merged_segment_bytes: Vec<u8>,
720 pub merged_rows: usize,
722 pub deleted_rows_pruned: usize,
727 pub bytes_reclaimed_estimate: u64,
731}
732
733#[derive(Debug, Clone)]
734pub enum IndexKind {
735 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
752 Nsw(NswGraph),
754 Brin {
761 column_type: DataType,
765 },
766 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
782}
783
784#[derive(Debug, Clone)]
793pub struct NswGraph {
794 pub m: usize,
796 pub m_max_0: usize,
799 pub entry: Option<usize>,
802 pub entry_level: u8,
804 pub levels: PersistentVec<u8>,
811 pub layers: Vec<PersistentVec<Vec<u32>>>,
827}
828
829impl NswGraph {
830 fn new(m: usize) -> Self {
831 Self {
832 m,
833 m_max_0: m.saturating_mul(2),
834 entry: None,
835 entry_level: 0,
836 levels: PersistentVec::new(),
837 layers: alloc::vec![PersistentVec::new()],
838 }
839 }
840
841 pub const fn cap_for_layer(&self, layer: u8) -> usize {
843 if layer == 0 { self.m_max_0 } else { self.m }
844 }
845}
846
847#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
854 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
857 x ^= x >> 30;
858 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
859 x ^= x >> 27;
860 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
861 x ^= x >> 31;
862 let mut level: u8 = 0;
867 while x & 0xF == 0 && level < MAX_LEVEL {
868 level += 1;
869 x >>= 4;
870 }
871 level
872}
873
874impl Index {
875 fn new_btree(name: String, column_position: usize) -> Self {
876 Self {
877 name,
878 column_position,
879 kind: IndexKind::BTree(PersistentBTreeMap::new()),
880 included_columns: Vec::new(),
881 partial_predicate: None,
882 expression: None,
883 is_unique: false,
884 extra_column_positions: Vec::new(),
885 }
886 }
887
888 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
889 Self {
890 name,
891 column_position,
892 kind: IndexKind::Nsw(NswGraph::new(m)),
893 included_columns: Vec::new(),
894 partial_predicate: None,
895 expression: None,
896 is_unique: false,
897 extra_column_positions: Vec::new(),
898 }
899 }
900
901 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
905 Self {
906 name,
907 column_position,
908 kind: IndexKind::Brin { column_type },
909 included_columns: Vec::new(),
910 partial_predicate: None,
911 expression: None,
912 is_unique: false,
913 extra_column_positions: Vec::new(),
914 }
915 }
916
917 fn new_gin(name: String, column_position: usize) -> Self {
922 Self {
923 name,
924 column_position,
925 kind: IndexKind::Gin(PersistentBTreeMap::new()),
926 included_columns: Vec::new(),
927 partial_predicate: None,
928 expression: None,
929 is_unique: false,
930 extra_column_positions: Vec::new(),
931 }
932 }
933
934 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
943 match &self.kind {
944 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
945 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => &[][..],
948 }
949 }
950
951 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
955 match &self.kind {
956 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
957 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
958 }
959 }
960
961 pub const fn nsw(&self) -> Option<&NswGraph> {
964 match &self.kind {
965 IndexKind::Nsw(g) => Some(g),
966 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
967 }
968 }
969
970 pub const fn is_brin(&self) -> bool {
975 matches!(self.kind, IndexKind::Brin { .. })
976 }
977
978 pub const fn is_gin(&self) -> bool {
982 matches!(self.kind, IndexKind::Gin(_))
983 }
984}
985
986#[derive(Debug, Clone)]
1002pub struct Table {
1003 schema: TableSchema,
1004 rows: PersistentVec<Row>,
1005 indices: Vec<Index>,
1006 hot_bytes: u64,
1007 cold_row_count: u64,
1021 cold_row_count_stale: bool,
1026}
1027
1028impl Table {
1029 pub fn new(schema: TableSchema) -> Self {
1030 Self {
1031 schema,
1032 rows: PersistentVec::new(),
1033 indices: Vec::new(),
1034 hot_bytes: 0,
1035 cold_row_count: 0,
1036 cold_row_count_stale: false,
1037 }
1038 }
1039
1040 #[must_use]
1044 pub const fn hot_bytes(&self) -> u64 {
1045 self.hot_bytes
1046 }
1047
1048 #[must_use]
1051 pub const fn cold_row_count(&self) -> u64 {
1052 self.cold_row_count
1053 }
1054
1055 pub fn set_cold_row_count(&mut self, n: u64) {
1058 self.cold_row_count = n;
1059 self.cold_row_count_stale = false;
1060 }
1061
1062 pub fn mark_cold_row_count_stale(&mut self) {
1067 self.cold_row_count_stale = true;
1068 }
1069
1070 #[must_use]
1074 pub const fn cold_row_count_stale(&self) -> bool {
1075 self.cold_row_count_stale
1076 }
1077
1078 #[must_use]
1089 pub fn count_cold_locators(&self) -> u64 {
1090 let mut best: u64 = 0;
1091 for idx in &self.indices {
1092 if let IndexKind::BTree(map) = &idx.kind {
1093 let n: u64 = map
1094 .iter()
1095 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1096 .sum();
1097 if n > best {
1098 best = n;
1099 }
1100 }
1101 }
1102 best
1103 }
1104
1105 pub const fn schema(&self) -> &TableSchema {
1106 &self.schema
1107 }
1108
1109 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1113 &mut self.schema
1114 }
1115
1116 pub const fn rows(&self) -> &PersistentVec<Row> {
1120 &self.rows
1121 }
1122
1123 pub const fn row_count(&self) -> usize {
1124 self.rows.len()
1125 }
1126
1127 pub fn indices_mut(&mut self) -> &mut [Index] {
1132 &mut self.indices
1133 }
1134
1135 pub fn indices(&self) -> &[Index] {
1136 &self.indices
1137 }
1138
1139 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1145 let ty = self.schema.columns.get(col_pos)?.ty;
1146 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1147 return None;
1148 }
1149 let mut max: Option<i64> = None;
1150 for row in &self.rows {
1151 match row.values.get(col_pos) {
1152 Some(Value::SmallInt(n)) => {
1153 let v = i64::from(*n);
1154 max = Some(max.map_or(v, |m| m.max(v)));
1155 }
1156 Some(Value::Int(n)) => {
1157 let v = i64::from(*n);
1158 max = Some(max.map_or(v, |m| m.max(v)));
1159 }
1160 Some(Value::BigInt(n)) => {
1161 max = Some(max.map_or(*n, |m| m.max(*n)));
1162 }
1163 _ => {}
1164 }
1165 }
1166 Some(max.map_or(1, |m| m + 1))
1167 }
1168
1169 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1173 self.indices
1180 .iter()
1181 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1182 .or_else(|| {
1183 self.indices.iter().find(|i| {
1184 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1185 })
1186 })
1187 }
1188
1189 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1193 if row.len() != self.schema.columns.len() {
1194 return Err(StorageError::ArityMismatch {
1195 expected: self.schema.columns.len(),
1196 actual: row.len(),
1197 });
1198 }
1199 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1200 if val.is_null() {
1201 if !col.nullable {
1202 return Err(StorageError::NullInNotNull {
1203 column: col.name.clone(),
1204 });
1205 }
1206 continue;
1207 }
1208 let actual = val.data_type().expect("non-null");
1209 let compatible = actual == col.ty
1223 || matches!(
1224 (actual, col.ty),
1225 (
1226 DataType::Text,
1227 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1228 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1229 | (DataType::Json, DataType::Jsonb)
1230 | (DataType::Jsonb, DataType::Json)
1231 | (DataType::Timestamp, DataType::Timestamptz)
1232 | (DataType::Timestamptz, DataType::Timestamp)
1233 )
1234 || matches!(
1235 (actual, col.ty),
1236 (
1237 DataType::Numeric { scale: a, .. },
1238 DataType::Numeric { scale: b, .. },
1239 ) if a == b
1240 );
1241 if !compatible {
1242 return Err(StorageError::TypeMismatch {
1243 column: col.name.clone(),
1244 expected: col.ty,
1245 actual,
1246 position: i,
1247 });
1248 }
1249 }
1250 let new_row_idx = self.rows.len();
1251 for idx in &mut self.indices {
1255 match &mut idx.kind {
1256 IndexKind::BTree(map) => {
1257 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1258 let mut entries = map.get(&key).cloned().unwrap_or_default();
1264 entries.push(RowLocator::Hot(new_row_idx));
1265 map.insert_mut(key, entries);
1266 }
1267 }
1268 IndexKind::Gin(map) => {
1269 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1273 for lex in lexemes {
1274 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1275 entries.push(RowLocator::Hot(new_row_idx));
1276 map.insert_mut(lex.word.clone(), entries);
1277 }
1278 }
1279 }
1280 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1284 }
1285 }
1286 self.hot_bytes = self
1289 .hot_bytes
1290 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1291 self.rows.push_mut(row);
1296 let new_row_idx = self.rows.len() - 1;
1299 let nsw_targets: Vec<usize> = self
1300 .indices
1301 .iter()
1302 .enumerate()
1303 .filter_map(|(i, idx)| {
1304 if matches!(idx.kind, IndexKind::Nsw(_)) {
1305 Some(i)
1306 } else {
1307 None
1308 }
1309 })
1310 .collect();
1311 for idx_pos in nsw_targets {
1312 nsw_insert_at(self, idx_pos, new_row_idx);
1313 }
1314 Ok(())
1315 }
1316
1317 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1321 if self.indices.iter().any(|i| i.name == name) {
1322 return Err(StorageError::DuplicateIndex { name });
1323 }
1324 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1325 StorageError::ColumnNotFound {
1326 column: column_name.into(),
1327 }
1328 })?;
1329 let mut idx = Index::new_btree(name, column_position);
1330 if let IndexKind::BTree(map) = &mut idx.kind {
1331 for (i, row) in self.rows.iter().enumerate() {
1332 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1333 let mut entries = map.get(&key).cloned().unwrap_or_default();
1334 entries.push(RowLocator::Hot(i));
1335 map.insert_mut(key, entries);
1336 }
1337 }
1338 }
1339 self.indices.push(idx);
1340 Ok(())
1341 }
1342
1343 pub fn add_nsw_index(
1348 &mut self,
1349 name: String,
1350 column_name: &str,
1351 m: usize,
1352 ) -> Result<(), StorageError> {
1353 self.add_nsw_index_inner(name, column_name, m, None)
1354 }
1355
1356 pub fn rebuild_nsw_index(
1368 &mut self,
1369 name: &str,
1370 new_encoding: Option<VecEncoding>,
1371 ) -> Result<(), StorageError> {
1372 let idx_pos = self
1373 .indices
1374 .iter()
1375 .position(|i| i.name == name)
1376 .ok_or_else(|| StorageError::IndexNotFound {
1377 name: String::from(name),
1378 })?;
1379 let col_pos = self.indices[idx_pos].column_position;
1380 let m = match &self.indices[idx_pos].kind {
1381 IndexKind::Nsw(g) => g.m,
1382 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1383 return Err(StorageError::Unsupported(format!(
1384 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1385 )));
1386 }
1387 };
1388 let col_name = self.schema.columns[col_pos].name.clone();
1389 if let Some(target) = new_encoding {
1392 let current = match self.schema.columns[col_pos].ty {
1393 DataType::Vector { encoding, .. } => encoding,
1394 ref other => {
1395 return Err(StorageError::Unsupported(format!(
1396 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1397 )));
1398 }
1399 };
1400 if target != current {
1401 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1402 unreachable!("checked above")
1403 };
1404 let n = self.rows.len();
1405 for i in 0..n {
1406 let row = self
1407 .rows
1408 .get_mut(i)
1409 .expect("row index in bounds (we iterated up to len())");
1410 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1411 let recoded = recode_vector_cell(cell, target)?;
1412 row.values[col_pos] = recoded;
1413 }
1414 self.schema.columns[col_pos].ty = DataType::Vector {
1415 dim,
1416 encoding: target,
1417 };
1418 }
1419 }
1420 self.indices.remove(idx_pos);
1422 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1423 Ok(())
1424 }
1425
1426 pub fn restore_nsw_index(
1431 &mut self,
1432 name: String,
1433 column_name: &str,
1434 graph: NswGraph,
1435 ) -> Result<(), StorageError> {
1436 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1437 }
1438
1439 pub fn restore_btree_index(
1446 &mut self,
1447 name: String,
1448 column_name: &str,
1449 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1450 ) -> Result<(), StorageError> {
1451 if self.indices.iter().any(|i| i.name == name) {
1452 return Err(StorageError::DuplicateIndex { name });
1453 }
1454 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1455 StorageError::ColumnNotFound {
1456 column: column_name.into(),
1457 }
1458 })?;
1459 self.indices.push(Index {
1460 name,
1461 column_position,
1462 kind: IndexKind::BTree(map),
1463 included_columns: Vec::new(),
1464 partial_predicate: None,
1465 expression: None,
1466 is_unique: false,
1467 extra_column_positions: Vec::new(),
1468 });
1469 Ok(())
1470 }
1471
1472 pub fn restore_brin_index(
1477 &mut self,
1478 name: String,
1479 column_name: &str,
1480 column_type: DataType,
1481 ) -> Result<(), StorageError> {
1482 if self.indices.iter().any(|i| i.name == name) {
1483 return Err(StorageError::DuplicateIndex { name });
1484 }
1485 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1486 StorageError::ColumnNotFound {
1487 column: column_name.into(),
1488 }
1489 })?;
1490 self.indices
1491 .push(Index::new_brin(name, column_position, column_type));
1492 Ok(())
1493 }
1494
1495 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1499 if self.indices.iter().any(|i| i.name == name) {
1500 return Err(StorageError::DuplicateIndex { name });
1501 }
1502 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1503 StorageError::ColumnNotFound {
1504 column: column_name.into(),
1505 }
1506 })?;
1507 let column_type = self.schema.columns[column_position].ty;
1508 self.indices
1509 .push(Index::new_brin(name, column_position, column_type));
1510 Ok(())
1511 }
1512
1513 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1518 if self.indices.iter().any(|i| i.name == name) {
1519 return Err(StorageError::DuplicateIndex { name });
1520 }
1521 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1522 StorageError::ColumnNotFound {
1523 column: column_name.into(),
1524 }
1525 })?;
1526 if self.schema.columns[column_position].ty != DataType::TsVector {
1527 return Err(StorageError::Corrupt(format!(
1528 "GIN index {name:?} requires a tsvector column; \
1529 {column_name:?} is {:?}",
1530 self.schema.columns[column_position].ty
1531 )));
1532 }
1533 let mut idx = Index::new_gin(name, column_position);
1534 if let IndexKind::Gin(map) = &mut idx.kind {
1535 for (i, row) in self.rows.iter().enumerate() {
1536 if let Value::TsVector(lexemes) = &row.values[column_position] {
1537 for lex in lexemes {
1538 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1539 entries.push(RowLocator::Hot(i));
1540 map.insert_mut(lex.word.clone(), entries);
1541 }
1542 }
1543 }
1544 }
1545 self.indices.push(idx);
1546 Ok(())
1547 }
1548
1549 pub fn restore_gin_index(
1554 &mut self,
1555 name: String,
1556 column_name: &str,
1557 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1558 ) -> Result<(), StorageError> {
1559 if self.indices.iter().any(|i| i.name == name) {
1560 return Err(StorageError::DuplicateIndex { name });
1561 }
1562 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1563 StorageError::ColumnNotFound {
1564 column: column_name.into(),
1565 }
1566 })?;
1567 let mut idx = Index::new_gin(name, column_position);
1568 idx.kind = IndexKind::Gin(map);
1569 self.indices.push(idx);
1570 Ok(())
1571 }
1572
1573 pub fn register_cold_locators<I>(
1590 &mut self,
1591 index_name: &str,
1592 locators: I,
1593 ) -> Result<usize, StorageError>
1594 where
1595 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1596 {
1597 let idx = self
1598 .indices
1599 .iter_mut()
1600 .find(|i| i.name == index_name)
1601 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1602 let map = match &mut idx.kind {
1603 IndexKind::BTree(map) => map,
1604 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1605 return Err(StorageError::Corrupt(format!(
1606 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1607 )));
1608 }
1609 };
1610 let mut count = 0usize;
1611 for (key, locator) in locators {
1612 let mut entries = map.get(&key).cloned().unwrap_or_default();
1613 entries.push(locator);
1614 map.insert_mut(key, entries);
1615 count += 1;
1616 }
1617 Ok(count)
1618 }
1619
1620 pub fn register_gin_cold_locators<I>(
1625 &mut self,
1626 index_name: &str,
1627 locators: I,
1628 ) -> Result<usize, StorageError>
1629 where
1630 I: IntoIterator<Item = (String, RowLocator)>,
1631 {
1632 let idx = self
1633 .indices
1634 .iter_mut()
1635 .find(|i| i.name == index_name)
1636 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1637 let map = match &mut idx.kind {
1638 IndexKind::Gin(map) => map,
1639 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1640 return Err(StorageError::Corrupt(format!(
1641 "register_gin_cold_locators: index {index_name:?} is not GIN"
1642 )));
1643 }
1644 };
1645 let mut count = 0usize;
1646 for (word, locator) in locators {
1647 let mut entries = map.get(&word).cloned().unwrap_or_default();
1648 entries.push(locator);
1649 map.insert_mut(word, entries);
1650 count += 1;
1651 }
1652 Ok(count)
1653 }
1654
1655 pub fn remove_cold_locators_for_key(
1665 &mut self,
1666 index_name: &str,
1667 key: &IndexKey,
1668 ) -> Result<usize, StorageError> {
1669 let idx = self
1670 .indices
1671 .iter_mut()
1672 .find(|i| i.name == index_name)
1673 .ok_or_else(|| {
1674 StorageError::Corrupt(format!(
1675 "remove_cold_locators_for_key: index {index_name:?} not found"
1676 ))
1677 })?;
1678 let map = match &mut idx.kind {
1679 IndexKind::BTree(map) => map,
1680 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1681 return Err(StorageError::Corrupt(format!(
1682 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1683 cold locators apply only to BTree indices"
1684 )));
1685 }
1686 };
1687 let Some(entries) = map.get(key) else {
1688 return Ok(0);
1689 };
1690 let mut kept: Vec<RowLocator> =
1691 entries.iter().copied().filter(RowLocator::is_hot).collect();
1692 let removed = entries.len() - kept.len();
1693 if removed == 0 {
1694 return Ok(0);
1695 }
1696 kept.shrink_to_fit();
1697 map.insert_mut(key.clone(), kept);
1705 Ok(removed)
1706 }
1707
1708 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1715 self.schema.columns.push(col);
1716 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1717 for row in self.rows.iter() {
1718 let mut values = row.values.clone();
1719 values.push(fill_value.clone());
1720 new_rows.push_mut(Row::new(values));
1721 }
1722 self.rows = new_rows;
1723 }
1724
1725 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1731 if positions.is_empty() {
1732 return 0;
1733 }
1734 let mut to_remove = alloc::vec![false; self.rows.len()];
1738 let mut removed = 0;
1739 for &p in positions {
1740 if p < to_remove.len() && !to_remove[p] {
1741 to_remove[p] = true;
1742 removed += 1;
1743 }
1744 }
1745 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1746 let mut removed_bytes: u64 = 0;
1747 for (i, row) in self.rows.iter().enumerate() {
1748 if to_remove[i] {
1749 removed_bytes =
1750 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1751 } else {
1752 new_rows.push_mut(row.clone());
1753 }
1754 }
1755 self.rows = new_rows;
1756 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1757 self.rebuild_indices();
1758 removed
1759 }
1760
1761 pub fn update_row(
1767 &mut self,
1768 position: usize,
1769 new_values: Vec<Value>,
1770 ) -> Result<(), StorageError> {
1771 if position >= self.rows.len() {
1772 return Err(StorageError::Corrupt(alloc::format!(
1773 "update_row: position {position} out of bounds (rows={})",
1774 self.rows.len()
1775 )));
1776 }
1777 if new_values.len() != self.schema.columns.len() {
1778 return Err(StorageError::ArityMismatch {
1779 expected: self.schema.columns.len(),
1780 actual: new_values.len(),
1781 });
1782 }
1783 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1787 if val.is_null() {
1788 if !col.nullable {
1789 return Err(StorageError::NullInNotNull {
1790 column: col.name.clone(),
1791 });
1792 }
1793 continue;
1794 }
1795 let actual = val.data_type().expect("non-null");
1796 let compatible = actual == col.ty
1797 || matches!(
1798 (actual, col.ty),
1799 (
1800 DataType::Text,
1801 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1802 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1803 | (DataType::Json, DataType::Jsonb)
1804 | (DataType::Jsonb, DataType::Json)
1805 | (DataType::Timestamp, DataType::Timestamptz)
1806 | (DataType::Timestamptz, DataType::Timestamp)
1807 )
1808 || matches!(
1809 (actual, col.ty),
1810 (
1811 DataType::Numeric { scale: a, .. },
1812 DataType::Numeric { scale: b, .. },
1813 ) if a == b
1814 );
1815 if !compatible {
1816 return Err(StorageError::TypeMismatch {
1817 column: col.name.clone(),
1818 expected: col.ty,
1819 actual,
1820 position: i,
1821 });
1822 }
1823 }
1824 let old_row = self
1825 .rows
1826 .get(position)
1827 .expect("position bounds-checked above");
1828 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1829 let new_row = Row::new(new_values);
1830 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1831 self.rows = self
1832 .rows
1833 .set(position, new_row)
1834 .expect("position bounds-checked above");
1835 self.hot_bytes = self
1836 .hot_bytes
1837 .saturating_sub(old_bytes)
1838 .saturating_add(new_bytes);
1839 self.rebuild_indices();
1840 Ok(())
1841 }
1842
1843 fn rebuild_indices(&mut self) {
1850 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1859 .indices
1860 .iter()
1861 .filter_map(|idx| match &idx.kind {
1862 IndexKind::BTree(map) => {
1863 let cold: Vec<(IndexKey, RowLocator)> = map
1864 .iter()
1865 .flat_map(|(k, locs)| {
1866 locs.iter()
1867 .filter(|l| l.is_cold())
1868 .copied()
1869 .map(move |l| (k.clone(), l))
1870 })
1871 .collect();
1872 if cold.is_empty() {
1873 None
1874 } else {
1875 Some((idx.name.clone(), cold))
1876 }
1877 }
1878 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
1881 })
1882 .collect();
1883
1884 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
1889 .indices
1890 .iter()
1891 .filter_map(|idx| match &idx.kind {
1892 IndexKind::Gin(map) => {
1893 let cold: Vec<(String, RowLocator)> = map
1894 .iter()
1895 .flat_map(|(w, locs)| {
1896 locs.iter()
1897 .filter(|l| l.is_cold())
1898 .copied()
1899 .map(move |l| (w.clone(), l))
1900 })
1901 .collect();
1902 if cold.is_empty() {
1903 None
1904 } else {
1905 Some((idx.name.clone(), cold))
1906 }
1907 }
1908 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1909 })
1910 .collect();
1911
1912 #[derive(Clone)]
1917 enum RebuildKind {
1918 BTree,
1919 Nsw(usize),
1920 Brin(DataType),
1921 Gin,
1922 }
1923 let descriptors: Vec<(String, usize, RebuildKind)> = self
1924 .indices
1925 .iter()
1926 .map(|idx| {
1927 let kind = match &idx.kind {
1928 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1929 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1930 IndexKind::BTree(_) => RebuildKind::BTree,
1931 IndexKind::Gin(_) => RebuildKind::Gin,
1932 };
1933 (idx.name.clone(), idx.column_position, kind)
1934 })
1935 .collect();
1936 self.indices.clear();
1937 for (name, column_position, rebuild_kind) in descriptors {
1938 match rebuild_kind {
1939 RebuildKind::Nsw(m) => {
1940 let idx = Index::new_nsw(name, column_position, m);
1941 self.indices.push(idx);
1942 let idx_pos = self.indices.len() - 1;
1943 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1944 for row_idx in row_indices {
1945 nsw_insert_at(self, idx_pos, row_idx);
1946 }
1947 }
1948 RebuildKind::Brin(column_type) => {
1949 self.indices
1952 .push(Index::new_brin(name, column_position, column_type));
1953 }
1954 RebuildKind::BTree => {
1955 let mut idx = Index::new_btree(name, column_position);
1956 if let IndexKind::BTree(map) = &mut idx.kind {
1957 for (i, row) in self.rows.iter().enumerate() {
1958 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1959 let mut entries = map.get(&key).cloned().unwrap_or_default();
1960 entries.push(RowLocator::Hot(i));
1961 map.insert_mut(key, entries);
1962 }
1963 }
1964 }
1965 self.indices.push(idx);
1966 }
1967 RebuildKind::Gin => {
1968 let mut idx = Index::new_gin(name, column_position);
1969 if let IndexKind::Gin(map) = &mut idx.kind {
1970 for (i, row) in self.rows.iter().enumerate() {
1971 if let Value::TsVector(lexemes) = &row.values[column_position] {
1972 for lex in lexemes {
1973 let mut entries =
1974 map.get(&lex.word).cloned().unwrap_or_default();
1975 entries.push(RowLocator::Hot(i));
1976 map.insert_mut(lex.word.clone(), entries);
1977 }
1978 }
1979 }
1980 }
1981 self.indices.push(idx);
1982 }
1983 }
1984 }
1985
1986 for (idx_name, locators) in preserved_cold {
1991 let _ = self.register_cold_locators(&idx_name, locators);
1995 }
1996 for (idx_name, locators) in preserved_gin_cold {
1998 let _ = self.register_gin_cold_locators(&idx_name, locators);
1999 }
2000 }
2001
2002 fn add_nsw_index_inner(
2003 &mut self,
2004 name: String,
2005 column_name: &str,
2006 m: usize,
2007 restore: Option<NswGraph>,
2008 ) -> Result<(), StorageError> {
2009 if self.indices.iter().any(|i| i.name == name) {
2010 return Err(StorageError::DuplicateIndex { name });
2011 }
2012 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2013 StorageError::ColumnNotFound {
2014 column: column_name.into(),
2015 }
2016 })?;
2017 if !matches!(
2018 self.schema.columns[column_position].ty,
2019 DataType::Vector { .. }
2020 ) {
2021 return Err(StorageError::TypeMismatch {
2022 column: column_name.into(),
2023 expected: DataType::Vector {
2024 dim: 0,
2025 encoding: VecEncoding::F32,
2026 },
2027 actual: self.schema.columns[column_position].ty,
2028 position: column_position,
2029 });
2030 }
2031 if let Some(graph) = restore {
2032 self.indices.push(Index {
2033 name,
2034 column_position,
2035 kind: IndexKind::Nsw(graph),
2036 included_columns: Vec::new(),
2037 partial_predicate: None,
2038 expression: None,
2039 is_unique: false,
2040 extra_column_positions: Vec::new(),
2041 });
2042 return Ok(());
2043 }
2044 let idx = Index::new_nsw(name, column_position, m);
2045 self.indices.push(idx);
2046 let idx_pos = self.indices.len() - 1;
2047 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2050 for row_idx in row_indices {
2051 nsw_insert_at(self, idx_pos, row_idx);
2052 }
2053 Ok(())
2054 }
2055}
2056
2057fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2064 if matches!(cell, Value::Null) {
2065 return Ok(cell);
2066 }
2067 let as_f32: Vec<f32> = match &cell {
2069 Value::Vector(v) => v.clone(),
2070 Value::Sq8Vector(q) => quantize::dequantize(q),
2071 Value::HalfVector(h) => h.to_f32_vec(),
2072 other => {
2073 return Err(StorageError::Unsupported(format!(
2074 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2075 other.data_type()
2076 )));
2077 }
2078 };
2079 Ok(match target {
2084 VecEncoding::F32 => Value::Vector(as_f32),
2085 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2086 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2087 })
2088}
2089
2090fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2097 let col_pos = table.indices[idx_pos].column_position;
2098 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2099 Value::Vector(v) => Some(v.len()),
2100 Value::Sq8Vector(q) => Some(q.bytes.len()),
2101 Value::HalfVector(h) => Some(h.dim()),
2102 _ => None,
2103 };
2104 let Some(dim) = cell_dim else {
2105 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2108 return;
2109 };
2110 if dim == 0 {
2111 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2112 return;
2113 }
2114 let level = nsw_assign_level(new_row_idx);
2115 ensure_node_slot(table, idx_pos, new_row_idx, level);
2116 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2117 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2118 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2119 unreachable!("nsw_insert_at on a non-NSW index")
2120 }
2121 };
2122 if entry.is_none() {
2124 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2125 g.entry = Some(new_row_idx);
2126 g.entry_level = level;
2127 *g.levels
2128 .get_mut(new_row_idx)
2129 .expect("levels slot padded by ensure_node_slot") = level;
2130 }
2131 return;
2132 }
2133 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2135 *g.levels
2136 .get_mut(new_row_idx)
2137 .expect("levels slot padded by ensure_node_slot") = level;
2138 }
2139 let query = match &table.rows[new_row_idx].values[col_pos] {
2140 Value::Vector(v) => v.clone(),
2141 Value::Sq8Vector(q) => quantize::dequantize(q),
2147 Value::HalfVector(h) => h.to_f32_vec(),
2150 _ => return,
2151 };
2152 let mut current = entry.expect("entry was Some above");
2155 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2156 if entry_level > level {
2157 for layer in (level + 1..=entry_level).rev() {
2158 (current, current_d) =
2159 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2160 }
2161 }
2162 let top = level.min(entry_level);
2166 let ef = (m * 2).max(8);
2167 for layer in (0..=top).rev() {
2168 let cap = if layer == 0 { m * 2 } else { m };
2169 let mut candidates = layer_beam_search(
2170 table,
2171 idx_pos,
2172 layer,
2173 current,
2174 current_d,
2175 &query,
2176 ef,
2177 NswMetric::L2,
2178 );
2179 candidates.retain(|&(_, n)| n != new_row_idx);
2180 if let Some(&(d, n)) = candidates.first() {
2183 current = n;
2184 current_d = d;
2185 }
2186 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2187 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2188 }
2189 if level > entry_level
2192 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2193 {
2194 g.entry = Some(new_row_idx);
2195 g.entry_level = level;
2196 }
2197}
2198
2199fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2203 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2204 unreachable!("ensure_node_slot on a BTree index");
2205 };
2206 while g.layers.len() <= level as usize {
2207 g.layers.push(PersistentVec::new());
2208 }
2209 while g.levels.len() <= new_row_idx {
2210 g.levels.push_mut(0);
2211 }
2212 for layer_vec in &mut g.layers {
2213 while layer_vec.len() <= new_row_idx {
2214 layer_vec.push_mut(Vec::new());
2215 }
2216 }
2217}
2218
2219fn greedy_layer_walk(
2225 table: &Table,
2226 idx_pos: usize,
2227 layer: u8,
2228 mut current: usize,
2229 mut current_d: f32,
2230 query: &[f32],
2231) -> (usize, f32) {
2232 let g = match &table.indices[idx_pos].kind {
2233 IndexKind::Nsw(g) => g,
2234 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2235 return (current, current_d);
2236 }
2237 };
2238 let col_pos = table.indices[idx_pos].column_position;
2239 loop {
2240 let neighbours: &[u32] = g
2241 .layers
2242 .get(layer as usize)
2243 .and_then(|layer_v| layer_v.get(current))
2244 .map_or(&[][..], Vec::as_slice);
2245 let mut best = current;
2246 let mut best_d = current_d;
2247 for &n in neighbours {
2248 let n = n as usize;
2249 let d = vec_l2_sq(table, col_pos, n, query);
2250 if d < best_d {
2251 best = n;
2252 best_d = d;
2253 }
2254 }
2255 if best == current {
2256 return (current, current_d);
2257 }
2258 current = best;
2259 current_d = best_d;
2260 }
2261}
2262
2263#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2276 table: &Table,
2277 idx_pos: usize,
2278 layer: u8,
2279 entry_node: usize,
2280 entry_d: f32,
2281 query: &[f32],
2282 ef: usize,
2283 metric: NswMetric,
2284) -> Vec<(f32, usize)> {
2285 let g = match &table.indices[idx_pos].kind {
2286 IndexKind::Nsw(g) => g,
2287 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2288 };
2289 let col_pos = table.indices[idx_pos].column_position;
2290 let d0 = if matches!(metric, NswMetric::L2) {
2291 entry_d
2292 } else {
2293 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2294 };
2295 let row_count = table.rows.len();
2296 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2297 if entry_node < row_count {
2298 visited[entry_node] = true;
2299 }
2300 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2303 alloc::collections::BinaryHeap::with_capacity(ef);
2304 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2305 alloc::collections::BinaryHeap::with_capacity(ef);
2306 candidates.push(NodeClosest {
2307 dist: d0,
2308 node: entry_node,
2309 });
2310 results.push(NodeFurthest {
2311 dist: d0,
2312 node: entry_node,
2313 });
2314 while let Some(cur) = candidates.pop() {
2315 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2316 if cur.dist > worst && results.len() >= ef {
2317 break;
2318 }
2319 let neighbours: &[u32] = g
2320 .layers
2321 .get(layer as usize)
2322 .and_then(|layer_v| layer_v.get(cur.node))
2323 .map_or(&[][..], Vec::as_slice);
2324 for &n in neighbours {
2325 let n = n as usize;
2326 if n >= row_count || visited[n] {
2327 continue;
2328 }
2329 visited[n] = true;
2330 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2334 if !dn.is_finite() {
2335 continue;
2336 }
2337 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2338 if results.len() < ef || dn < worst {
2339 results.push(NodeFurthest { dist: dn, node: n });
2340 if results.len() > ef {
2341 results.pop();
2342 }
2343 candidates.push(NodeClosest { dist: dn, node: n });
2344 }
2345 }
2346 }
2347 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2350 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2351 out
2352}
2353
2354#[derive(Debug, Clone, Copy)]
2358struct NodeClosest {
2359 dist: f32,
2360 node: usize,
2361}
2362impl PartialEq for NodeClosest {
2363 fn eq(&self, other: &Self) -> bool {
2364 self.dist == other.dist && self.node == other.node
2365 }
2366}
2367impl Eq for NodeClosest {}
2368impl PartialOrd for NodeClosest {
2369 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2370 Some(self.cmp(other))
2371 }
2372}
2373impl Ord for NodeClosest {
2374 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2375 other
2377 .dist
2378 .partial_cmp(&self.dist)
2379 .unwrap_or(core::cmp::Ordering::Equal)
2380 }
2381}
2382
2383#[derive(Debug, Clone, Copy)]
2386struct NodeFurthest {
2387 dist: f32,
2388 node: usize,
2389}
2390impl PartialEq for NodeFurthest {
2391 fn eq(&self, other: &Self) -> bool {
2392 self.dist == other.dist && self.node == other.node
2393 }
2394}
2395impl Eq for NodeFurthest {}
2396impl PartialOrd for NodeFurthest {
2397 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2398 Some(self.cmp(other))
2399 }
2400}
2401impl Ord for NodeFurthest {
2402 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2403 self.dist
2404 .partial_cmp(&other.dist)
2405 .unwrap_or(core::cmp::Ordering::Equal)
2406 }
2407}
2408
2409fn select_neighbours_heuristic(
2418 candidates: &[(f32, usize)],
2419 m: usize,
2420 table: &Table,
2421 col_pos: usize,
2422) -> Vec<usize> {
2423 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2424 for &(d_q, e) in candidates {
2425 if chosen.len() >= m {
2426 break;
2427 }
2428 if !matches!(
2433 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2434 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2435 ) {
2436 continue;
2437 }
2438 let mut covered = false;
2439 for &r in &chosen {
2440 if cell_l2_sq(table, col_pos, e, r) < d_q {
2444 covered = true;
2445 break;
2446 }
2447 }
2448 if !covered {
2449 chosen.push(e);
2450 }
2451 }
2452 chosen
2453}
2454
2455fn connect_at_layer(
2459 table: &mut Table,
2460 idx_pos: usize,
2461 layer: u8,
2462 new_row_idx: usize,
2463 peers: &[usize],
2464) {
2465 let col_pos = table.indices[idx_pos].column_position;
2466 let cap = match &table.indices[idx_pos].kind {
2467 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2468 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return,
2469 };
2470 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2475 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2476 let layer_v = &mut g.layers[layer as usize];
2477 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2478 *slot = peers
2479 .iter()
2480 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2481 .collect();
2482 }
2483 }
2484 for &peer in peers {
2485 if !matches!(
2489 &table.rows[peer].values[col_pos],
2490 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2491 ) {
2492 continue;
2493 }
2494 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2496 let layer_v = &mut g.layers[layer as usize];
2497 if let Some(slot) = layer_v.get_mut(peer)
2498 && !slot.contains(&new_row_u32)
2499 {
2500 slot.push(new_row_u32);
2501 }
2502 }
2503 let needs_trim = match &table.indices[idx_pos].kind {
2507 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2508 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => false,
2509 };
2510 if needs_trim {
2511 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2512 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2513 .iter()
2514 .map(|&n| n as usize)
2515 .collect(),
2516 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => continue,
2517 };
2518 let mut tagged: Vec<(f32, usize)> = current_peers
2523 .iter()
2524 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2525 .collect();
2526 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2527 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2528 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2529 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2530 {
2531 *slot = kept
2532 .into_iter()
2533 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2534 .collect();
2535 }
2536 }
2537 }
2538}
2539
2540fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2547 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2548 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2549 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2550 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2551 }
2552 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2556 halfvec::half_l2_distance_sq_asymmetric(h, query)
2557 }
2558 _ => f32::INFINITY,
2559 }
2560}
2561
2562fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2569 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2570 return f32::INFINITY;
2571 };
2572 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2573 return f32::INFINITY;
2574 };
2575 match (cell_a, cell_b) {
2576 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2577 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2578 quantize::sq8_l2_distance_sq(a, b)
2579 }
2580 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2585 halfvec::half_l2_distance_sq(a, b)
2586 }
2587 _ => f32::INFINITY,
2588 }
2589}
2590
2591fn cell_to_query_metric_distance(
2596 table: &Table,
2597 col_pos: usize,
2598 row: usize,
2599 query: &[f32],
2600 metric: NswMetric,
2601) -> f32 {
2602 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2603 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2604 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2605 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2606 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2607 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2608 },
2609 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2612 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2613 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2614 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2615 },
2616 _ => f32::INFINITY,
2617 }
2618}
2619
2620#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2626pub enum NswMetric {
2627 L2,
2630 InnerProduct,
2633 Cosine,
2636}
2637
2638fn nsw_search(
2644 table: &Table,
2645 idx_pos: usize,
2646 query: &[f32],
2647 k: usize,
2648 ef: usize,
2649 metric: NswMetric,
2650) -> Vec<(f32, usize)> {
2651 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2652 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2653 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2654 };
2655 let Some(entry) = entry else {
2656 return Vec::new();
2657 };
2658 let col_pos = table.indices[idx_pos].column_position;
2659 let sq8 = matches!(
2666 table.schema.columns.get(col_pos).map(|c| c.ty),
2667 Some(DataType::Vector {
2668 encoding: VecEncoding::Sq8,
2669 ..
2670 })
2671 );
2672 let ef = if sq8 {
2673 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2674 } else {
2675 ef.max(k)
2676 };
2677 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2679 let mut current = entry;
2680 let mut current_d = entry_d;
2681 for layer in (1..=entry_level).rev() {
2682 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2683 }
2684 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2686 if sq8 {
2687 results = sq8_rerank(table, col_pos, &results, query, metric);
2688 }
2689 results.truncate(k);
2690 results
2691}
2692
2693fn sq8_rerank(
2700 table: &Table,
2701 col_pos: usize,
2702 candidates: &[(f32, usize)],
2703 query: &[f32],
2704 metric: NswMetric,
2705) -> Vec<(f32, usize)> {
2706 let mut out: Vec<(f32, usize)> = candidates
2707 .iter()
2708 .filter_map(|&(adc_d, row)| {
2709 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2710 let Value::Sq8Vector(q) = cell else {
2711 return Some((adc_d, row));
2715 };
2716 let deq = quantize::dequantize(q);
2717 if deq.len() != query.len() {
2718 return None;
2719 }
2720 Some((metric_distance(metric, &deq, query), row))
2721 })
2722 .collect();
2723 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2724 out
2725}
2726
2727const SQ8_RERANK_OVER_FETCH: usize = 3;
2731
2732fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2733 match metric {
2734 NswMetric::L2 => l2_distance_sq(a, b),
2735 NswMetric::InnerProduct => -inner_product_f32(a, b),
2736 NswMetric::Cosine => {
2737 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2738 if na == 0.0 || nb == 0.0 {
2739 return f32::INFINITY;
2740 }
2741 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2744 1.0 - dot / denom
2745 }
2746 }
2747}
2748
2749#[doc(hidden)]
2758#[inline]
2759pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2760 #[cfg(target_arch = "aarch64")]
2761 {
2762 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2763 return unsafe { inner_product_neon(a, b) };
2766 }
2767 }
2768 inner_product_scalar(a, b)
2769}
2770
2771fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2772 let mut dot: f32 = 0.0;
2773 for (x, y) in a.iter().zip(b.iter()) {
2774 dot += x * y;
2775 }
2776 dot
2777}
2778
2779#[cfg(target_arch = "aarch64")]
2780#[target_feature(enable = "neon")]
2781#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2783 use core::arch::aarch64::{
2784 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2785 };
2786 unsafe {
2787 let zero: float32x4_t = vdupq_n_f32(0.0);
2790 let mut acc0 = zero;
2791 let mut acc1 = zero;
2792 let n = a.len();
2793 let mut i = 0usize;
2794 while i + 8 <= n {
2795 let av0 = vld1q_f32(a.as_ptr().add(i));
2796 let bv0 = vld1q_f32(b.as_ptr().add(i));
2797 acc0 = vfmaq_f32(acc0, av0, bv0);
2798 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2799 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2800 acc1 = vfmaq_f32(acc1, av1, bv1);
2801 i += 8;
2802 }
2803 while i + 4 <= n {
2804 let av = vld1q_f32(a.as_ptr().add(i));
2805 let bv = vld1q_f32(b.as_ptr().add(i));
2806 acc0 = vfmaq_f32(acc0, av, bv);
2807 i += 4;
2808 }
2809 vaddvq_f32(vaddq_f32(acc0, acc1))
2810 }
2811}
2812
2813#[doc(hidden)]
2820#[inline]
2821pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2822 #[cfg(target_arch = "aarch64")]
2823 {
2824 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2825 return unsafe { cosine_dot_norms_neon(a, b) };
2827 }
2828 }
2829 cosine_dot_norms_scalar(a, b)
2830}
2831
2832fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2833 let mut dot: f32 = 0.0;
2834 let mut na: f32 = 0.0;
2835 let mut nb: f32 = 0.0;
2836 for (x, y) in a.iter().zip(b.iter()) {
2837 dot += x * y;
2838 na += x * x;
2839 nb += y * y;
2840 }
2841 (dot, na, nb)
2842}
2843
2844#[cfg(target_arch = "aarch64")]
2845#[target_feature(enable = "neon")]
2846#[allow(clippy::many_single_char_names, clippy::similar_names)]
2847unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2848 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2849 unsafe {
2850 let zero: float32x4_t = vdupq_n_f32(0.0);
2851 let mut acc_dot = zero;
2852 let mut acc_na = zero;
2853 let mut acc_nb = zero;
2854 let n = a.len();
2855 let mut i = 0usize;
2856 while i + 4 <= n {
2857 let av = vld1q_f32(a.as_ptr().add(i));
2858 let bv = vld1q_f32(b.as_ptr().add(i));
2859 acc_dot = vfmaq_f32(acc_dot, av, bv);
2860 acc_na = vfmaq_f32(acc_na, av, av);
2861 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2862 i += 4;
2863 }
2864 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2865 }
2866}
2867
2868fn sqrt_newton_f32(x: f32) -> f32 {
2869 if x <= 0.0 {
2870 return 0.0;
2871 }
2872 let mut g = x;
2873 for _ in 0..10 {
2874 g = 0.5 * (g + x / g);
2875 }
2876 g
2877}
2878
2879#[inline]
2887fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2888 #[cfg(target_arch = "aarch64")]
2889 {
2890 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2891 return unsafe { l2_distance_sq_neon(a, b) };
2895 }
2896 }
2897 l2_distance_sq_scalar(a, b)
2898}
2899
2900fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2901 let mut sum: f32 = 0.0;
2902 for (x, y) in a.iter().zip(b.iter()) {
2903 let d = *x - *y;
2904 sum += d * d;
2905 }
2906 sum
2907}
2908
2909#[cfg(target_arch = "aarch64")]
2910#[target_feature(enable = "neon")]
2911#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2913 use core::arch::aarch64::{
2914 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2915 };
2916 unsafe {
2917 let zero: float32x4_t = vdupq_n_f32(0.0);
2922 let mut acc0 = zero;
2923 let mut acc1 = zero;
2924 let n = a.len();
2925 let mut i = 0usize;
2926 while i + 8 <= n {
2929 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2930 acc0 = vfmaq_f32(acc0, d0, d0);
2931 let d1 = vsubq_f32(
2932 vld1q_f32(a.as_ptr().add(i + 4)),
2933 vld1q_f32(b.as_ptr().add(i + 4)),
2934 );
2935 acc1 = vfmaq_f32(acc1, d1, d1);
2936 i += 8;
2937 }
2938 while i + 4 <= n {
2939 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2940 acc0 = vfmaq_f32(acc0, d, d);
2941 i += 4;
2942 }
2943 vaddvq_f32(vaddq_f32(acc0, acc1))
2944 }
2945}
2946
2947pub fn nsw_query(
2950 table: &Table,
2951 idx_name: &str,
2952 query: &[f32],
2953 k: usize,
2954 metric: NswMetric,
2955) -> Vec<usize> {
2956 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2957 return Vec::new();
2958 };
2959 let ef = (k * 2).max(NSW_DEFAULT_M);
2960 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2961 hits.truncate(k);
2962 hits.into_iter().map(|(_, idx)| idx).collect()
2963}
2964
2965pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2969 table
2970 .indices
2971 .iter()
2972 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2973}
2974
2975#[derive(Debug, Clone, Default)]
2987pub struct Catalog {
2988 tables: Vec<Table>,
2989 by_name: BTreeMap<String, usize>,
2992 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3014 functions: BTreeMap<String, FunctionDef>,
3021 triggers: Vec<TriggerDef>,
3026}
3027
3028#[derive(Debug, Clone, PartialEq, Eq)]
3034pub struct FunctionDef {
3035 pub name: String,
3036 pub args_repr: String,
3040 pub returns: String,
3045 pub language: String,
3047 pub body: String,
3052}
3053
3054#[derive(Debug, Clone, PartialEq, Eq)]
3058pub struct TriggerDef {
3059 pub name: String,
3060 pub table: String,
3062 pub timing: String,
3066 pub events: Vec<String>,
3069 pub for_each: String,
3073 pub function: String,
3075 pub update_columns: Vec<String>,
3082}
3083
3084impl Catalog {
3085 pub const fn new() -> Self {
3086 Self {
3087 tables: Vec::new(),
3088 by_name: BTreeMap::new(),
3089 cold_segments: Vec::new(),
3090 functions: BTreeMap::new(),
3091 triggers: Vec::new(),
3092 }
3093 }
3094
3095 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3099 &self.functions
3100 }
3101
3102 pub fn create_function(
3106 &mut self,
3107 def: FunctionDef,
3108 or_replace: bool,
3109 ) -> Result<(), StorageError> {
3110 if !or_replace && self.functions.contains_key(&def.name) {
3111 return Err(StorageError::Corrupt(format!(
3112 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3113 def.name
3114 )));
3115 }
3116 self.functions.insert(def.name.clone(), def);
3117 Ok(())
3118 }
3119
3120 pub fn drop_function(&mut self, name: &str) -> bool {
3124 self.functions.remove(name).is_some()
3125 }
3126
3127 pub fn triggers(&self) -> &[TriggerDef] {
3131 &self.triggers
3132 }
3133
3134 pub fn create_trigger(
3140 &mut self,
3141 def: TriggerDef,
3142 or_replace: bool,
3143 ) -> Result<(), StorageError> {
3144 if !self.by_name.contains_key(&def.table) {
3145 return Err(StorageError::TableNotFound {
3146 name: def.table.clone(),
3147 });
3148 }
3149 if !self.functions.contains_key(&def.function) {
3150 return Err(StorageError::Corrupt(format!(
3151 "trigger {:?} references unknown function {:?}",
3152 def.name, def.function
3153 )));
3154 }
3155 let dup = self
3156 .triggers
3157 .iter()
3158 .position(|t| t.name == def.name && t.table == def.table);
3159 match (dup, or_replace) {
3160 (Some(_), false) => Err(StorageError::Corrupt(format!(
3161 "trigger {:?} already exists on table {:?}",
3162 def.name, def.table
3163 ))),
3164 (Some(i), true) => {
3165 self.triggers[i] = def;
3166 Ok(())
3167 }
3168 (None, _) => {
3169 self.triggers.push(def);
3170 Ok(())
3171 }
3172 }
3173 }
3174
3175 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3178 let before = self.triggers.len();
3179 self.triggers
3180 .retain(|t| !(t.name == name && t.table == table));
3181 before != self.triggers.len()
3182 }
3183
3184 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3185 if self.by_name.contains_key(&schema.name) {
3186 return Err(StorageError::DuplicateTable {
3187 name: schema.name.clone(),
3188 });
3189 }
3190 let idx = self.tables.len();
3191 let name = schema.name.clone();
3192 self.tables.push(Table::new(schema));
3193 self.by_name.insert(name, idx);
3194 Ok(())
3195 }
3196
3197 pub fn get(&self, name: &str) -> Option<&Table> {
3198 let idx = *self.by_name.get(name)?;
3199 self.tables.get(idx)
3200 }
3201
3202 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3203 let idx = *self.by_name.get(name)?;
3204 self.tables.get_mut(idx)
3205 }
3206
3207 pub fn table_count(&self) -> usize {
3208 self.tables.len()
3209 }
3210
3211 pub fn table_names(&self) -> Vec<String> {
3214 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3215 }
3216
3217 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3228 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3229 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3230 })?;
3231 let seg = OwnedSegment::from_bytes(bytes)
3232 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3233 self.cold_segments.push(Some(Arc::new(seg)));
3234 Ok(id)
3235 }
3236
3237 pub fn load_segment_bytes_at(
3250 &mut self,
3251 target_id: u32,
3252 bytes: Vec<u8>,
3253 ) -> Result<(), StorageError> {
3254 let seg = OwnedSegment::from_bytes(bytes)
3255 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3256 let idx = target_id as usize;
3257 while self.cold_segments.len() <= idx {
3258 self.cold_segments.push(None);
3259 }
3260 if self.cold_segments[idx].is_some() {
3261 return Err(StorageError::Corrupt(format!(
3262 "load_segment_bytes_at: segment_id {target_id} already occupied"
3263 )));
3264 }
3265 self.cold_segments[idx] = Some(Arc::new(seg));
3266 Ok(())
3267 }
3268
3269 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3279 let idx = segment_id as usize;
3280 if idx >= self.cold_segments.len() {
3281 return Err(StorageError::Corrupt(format!(
3282 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3283 self.cold_segments.len()
3284 )));
3285 }
3286 self.cold_segments[idx] = None;
3287 Ok(())
3288 }
3289
3290 #[must_use]
3292 pub fn cold_segment_count(&self) -> usize {
3293 self.cold_segments.iter().filter(|s| s.is_some()).count()
3294 }
3295
3296 #[must_use]
3299 pub fn cold_segment_slot_count(&self) -> usize {
3300 self.cold_segments.len()
3301 }
3302
3303 #[must_use]
3308 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3309 self.cold_segments
3310 .iter()
3311 .enumerate()
3312 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3313 .collect()
3314 }
3315
3316 #[must_use]
3323 pub fn hot_tier_bytes(&self) -> u64 {
3324 self.tables
3325 .iter()
3326 .map(Table::hot_bytes)
3327 .fold(0u64, u64::saturating_add)
3328 }
3329
3330 pub fn freeze_oldest_to_cold(
3375 &mut self,
3376 table_name: &str,
3377 index_name: &str,
3378 max_rows: usize,
3379 ) -> Result<FreezeReport, StorageError> {
3380 if max_rows == 0 {
3382 return Err(StorageError::Corrupt(
3383 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3384 ));
3385 }
3386 let table = self.get(table_name).ok_or_else(|| {
3387 StorageError::Corrupt(format!(
3388 "freeze_oldest_to_cold: table {table_name:?} not found"
3389 ))
3390 })?;
3391 if max_rows > table.rows.len() {
3392 return Err(StorageError::Corrupt(format!(
3393 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3394 table.rows.len()
3395 )));
3396 }
3397 let idx = table
3398 .indices
3399 .iter()
3400 .find(|i| i.name == index_name)
3401 .ok_or_else(|| {
3402 StorageError::Corrupt(format!(
3403 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3404 ))
3405 })?;
3406 if !matches!(idx.kind, IndexKind::BTree(_)) {
3407 return Err(StorageError::Corrupt(format!(
3408 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3409 )));
3410 }
3411 let column_position = idx.column_position;
3412
3413 let schema = table.schema.clone();
3415 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3416 for row_idx in 0..max_rows {
3417 let row = table.rows.get(row_idx).expect("bounds-checked above");
3418 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3419 StorageError::Corrupt(format!(
3420 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3421 ))
3422 })?;
3423 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3424 StorageError::Corrupt(format!(
3425 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3426 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3427 ))
3428 })?;
3429 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3430 }
3431 to_freeze.sort_by_key(|(k, _, _)| *k);
3436 for w in to_freeze.windows(2) {
3440 if w[0].0 == w[1].0 {
3441 return Err(StorageError::Corrupt(format!(
3442 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3443 w[0].0
3444 )));
3445 }
3446 }
3447 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3451 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3455 .into_iter()
3456 .map(|(k, body, _)| (k, body))
3457 .collect();
3458 let frozen_rows = seg_rows.len();
3459 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3460 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3461
3462 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3471 let positions: Vec<usize> = (0..max_rows).collect();
3472 let t_mut = self
3473 .get_mut(table_name)
3474 .expect("just validated; still present");
3475 let removed = t_mut.delete_rows(&positions);
3476 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3477 let bytes_after = t_mut.hot_bytes();
3478 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3479
3480 let segment_id = self
3481 .load_segment_bytes(seg_bytes.clone())
3482 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3483 let new_cold = post_swap_keys.into_iter().map(|k| {
3484 (
3485 k,
3486 RowLocator::Cold {
3487 segment_id,
3488 page_offset: 0,
3489 },
3490 )
3491 });
3492 let t_mut = self.get_mut(table_name).expect("still present");
3493 t_mut.register_cold_locators(index_name, new_cold)?;
3494
3495 Ok(FreezeReport {
3496 segment_id,
3497 frozen_rows,
3498 bytes_freed,
3499 segment_bytes: seg_bytes,
3500 })
3501 }
3502
3503 #[must_use]
3509 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3510 self.cold_segments
3511 .get(segment_id as usize)
3512 .and_then(|s| s.as_deref())
3513 }
3514
3515 pub fn resolve_cold_locator(
3524 &self,
3525 table_name: &str,
3526 segment_id: u32,
3527 key: &IndexKey,
3528 ) -> Option<Row> {
3529 let t = self.get(table_name)?;
3530 let u64_key = index_key_as_u64(key)?;
3531 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3532 let payload = seg.lookup(u64_key)?;
3533 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3534 Some(row)
3535 }
3536
3537 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3555 let t = self.get(table)?;
3556 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3557 let locators = idx.lookup_eq(key);
3558 let cold_u64_key = index_key_as_u64(key);
3559 for loc in locators {
3560 match *loc {
3561 RowLocator::Hot(i) => {
3562 if let Some(row) = t.rows.get(i) {
3563 return Some(row.clone());
3564 }
3565 }
3566 RowLocator::Cold {
3567 segment_id,
3568 page_offset: _,
3569 } => {
3570 let Some(u64_key) = cold_u64_key else {
3571 continue;
3574 };
3575 let Some(seg) = self
3576 .cold_segments
3577 .get(segment_id as usize)
3578 .and_then(|s| s.as_deref())
3579 else {
3580 continue;
3591 };
3592 let Some(payload) = seg.lookup(u64_key) else {
3593 continue;
3594 };
3595 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3596 return Some(row);
3597 }
3598 }
3599 }
3600 None
3601 }
3602
3603 pub fn promote_cold_row(
3625 &mut self,
3626 table_name: &str,
3627 index_name: &str,
3628 key: &IndexKey,
3629 ) -> Result<Option<usize>, StorageError> {
3630 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3631 let Some((segment_id, _page_offset)) = cold_loc else {
3632 return Ok(None);
3633 };
3634 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3635 StorageError::Corrupt(
3636 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3637 .into(),
3638 )
3639 })?;
3640 let schema = self
3644 .get(table_name)
3645 .ok_or_else(|| {
3646 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3647 })?
3648 .schema
3649 .clone();
3650 let seg = self
3651 .cold_segments
3652 .get(segment_id as usize)
3653 .and_then(|s| s.as_ref())
3654 .ok_or_else(|| {
3655 StorageError::Corrupt(format!(
3656 "promote_cold_row: segment {segment_id} not registered on catalog"
3657 ))
3658 })?;
3659 let payload = seg.lookup(u64_key).ok_or_else(|| {
3660 StorageError::Corrupt(format!(
3661 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3662 but the segment's bloom/page lookup didn't return a row"
3663 ))
3664 })?;
3665 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3666 let t = self
3671 .get_mut(table_name)
3672 .expect("table existed at lookup time");
3673 t.insert(row)?;
3674 let new_hot_idx =
3675 t.rows.len().checked_sub(1).ok_or_else(|| {
3676 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3677 })?;
3678 t.remove_cold_locators_for_key(index_name, key)?;
3682 Ok(Some(new_hot_idx))
3683 }
3684
3685 pub fn shadow_cold_row(
3703 &mut self,
3704 table_name: &str,
3705 index_name: &str,
3706 key: &IndexKey,
3707 ) -> Result<usize, StorageError> {
3708 let t = self.get_mut(table_name).ok_or_else(|| {
3709 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3710 })?;
3711 t.remove_cold_locators_for_key(index_name, key)
3712 }
3713
3714 pub fn prepare_freeze_slice(
3732 &self,
3733 table_name: &str,
3734 index_name: &str,
3735 row_range: core::ops::Range<usize>,
3736 ) -> Result<FreezeSlice, StorageError> {
3737 let table = self.get(table_name).ok_or_else(|| {
3738 StorageError::Corrupt(format!(
3739 "prepare_freeze_slice: table {table_name:?} not found"
3740 ))
3741 })?;
3742 let idx = table
3743 .indices
3744 .iter()
3745 .find(|i| i.name == index_name)
3746 .ok_or_else(|| {
3747 StorageError::Corrupt(format!(
3748 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3749 ))
3750 })?;
3751 if !matches!(idx.kind, IndexKind::BTree(_)) {
3752 return Err(StorageError::Corrupt(format!(
3753 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3754 )));
3755 }
3756 if row_range.end > table.rows.len() {
3757 return Err(StorageError::Corrupt(format!(
3758 "prepare_freeze_slice: row_range end {} > row_count {}",
3759 row_range.end,
3760 table.rows.len()
3761 )));
3762 }
3763 let column_position = idx.column_position;
3764 let schema = table.schema.clone();
3765 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3766 for row_idx in row_range.clone() {
3767 let row = table.rows.get(row_idx).expect("bounds-checked above");
3768 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3769 StorageError::Corrupt(format!(
3770 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3771 ))
3772 })?;
3773 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3774 StorageError::Corrupt(format!(
3775 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3776 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3777 ))
3778 })?;
3779 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3780 }
3781 rows.sort_by_key(|(k, _, _)| *k);
3782 Ok(FreezeSlice { row_range, rows })
3783 }
3784
3785 pub fn commit_freeze_slices(
3799 &mut self,
3800 table_name: &str,
3801 index_name: &str,
3802 slices: Vec<FreezeSlice>,
3803 ) -> Result<FreezeReport, StorageError> {
3804 let table = self.get(table_name).ok_or_else(|| {
3806 StorageError::Corrupt(format!(
3807 "commit_freeze_slices: table {table_name:?} not found"
3808 ))
3809 })?;
3810 let idx = table
3811 .indices
3812 .iter()
3813 .find(|i| i.name == index_name)
3814 .ok_or_else(|| {
3815 StorageError::Corrupt(format!(
3816 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3817 ))
3818 })?;
3819 if !matches!(idx.kind, IndexKind::BTree(_)) {
3820 return Err(StorageError::Corrupt(format!(
3821 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3822 )));
3823 }
3824 let mut ordered = slices;
3828 ordered.sort_by_key(|s| s.row_range.start);
3829 let mut expected_start = 0usize;
3833 for s in &ordered {
3834 if s.row_range.start != expected_start {
3835 return Err(StorageError::Corrupt(format!(
3836 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3837 s.row_range.start, expected_start
3838 )));
3839 }
3840 expected_start = s.row_range.end;
3841 }
3842 let max_rows = expected_start;
3843 if max_rows > table.rows.len() {
3844 return Err(StorageError::Corrupt(format!(
3845 "commit_freeze_slices: total row range {} exceeds row_count {}",
3846 max_rows,
3847 table.rows.len()
3848 )));
3849 }
3850 if max_rows == 0 {
3851 return Ok(FreezeReport {
3852 segment_id: u32::MAX,
3853 frozen_rows: 0,
3854 bytes_freed: 0,
3855 segment_bytes: Vec::new(),
3856 });
3857 }
3858
3859 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3864 if total_rows != max_rows {
3865 return Err(StorageError::Corrupt(format!(
3866 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3867 )));
3868 }
3869 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3870 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3871 loop {
3872 let mut pick: Option<usize> = None;
3875 for (i, c) in cursors.iter().enumerate() {
3876 let slice = &ordered[i];
3877 if *c >= slice.rows.len() {
3878 continue;
3879 }
3880 match pick {
3881 None => pick = Some(i),
3882 Some(j) => {
3883 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3884 pick = Some(i);
3885 }
3886 }
3887 }
3888 }
3889 let Some(i) = pick else { break };
3890 let row = ordered[i].rows[cursors[i]].clone();
3891 cursors[i] += 1;
3892 merged.push(row);
3893 }
3894 for w in merged.windows(2) {
3897 if w[0].0 == w[1].0 {
3898 return Err(StorageError::Corrupt(format!(
3899 "commit_freeze_slices: duplicate PK {} across slices",
3900 w[0].0
3901 )));
3902 }
3903 }
3904 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3905 let seg_rows: Vec<(u64, Vec<u8>)> =
3906 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
3907 let frozen_rows = seg_rows.len();
3908 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3909 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
3910
3911 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3913 let positions: Vec<usize> = (0..max_rows).collect();
3914 let t_mut = self
3915 .get_mut(table_name)
3916 .expect("just validated; still present");
3917 let removed = t_mut.delete_rows(&positions);
3918 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3919 let bytes_after = t_mut.hot_bytes();
3920 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3921
3922 let segment_id = self
3923 .load_segment_bytes(seg_bytes.clone())
3924 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3925 let new_cold = post_swap_keys.into_iter().map(|k| {
3926 (
3927 k,
3928 RowLocator::Cold {
3929 segment_id,
3930 page_offset: 0,
3931 },
3932 )
3933 });
3934 let t_mut = self.get_mut(table_name).expect("still present");
3935 t_mut.register_cold_locators(index_name, new_cold)?;
3936
3937 Ok(FreezeReport {
3938 segment_id,
3939 frozen_rows,
3940 bytes_freed,
3941 segment_bytes: seg_bytes,
3942 })
3943 }
3944
3945 pub fn compact_cold_segments(
3988 &mut self,
3989 table_name: &str,
3990 index_name: &str,
3991 target_segment_bytes: u64,
3992 ) -> Result<CompactReport, StorageError> {
3993 let t = self.get(table_name).ok_or_else(|| {
3995 StorageError::Corrupt(format!(
3996 "compact_cold_segments: table {table_name:?} not found"
3997 ))
3998 })?;
3999 let idx = t
4000 .indices
4001 .iter()
4002 .find(|i| i.name == index_name)
4003 .ok_or_else(|| {
4004 StorageError::Corrupt(format!(
4005 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4006 ))
4007 })?;
4008 let map = match &idx.kind {
4009 IndexKind::BTree(m) => m,
4010 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
4011 return Err(StorageError::Corrupt(format!(
4012 "compact_cold_segments: index {index_name:?} is not BTree; \
4013 compaction applies only to BTree cold-tier indices"
4014 )));
4015 }
4016 };
4017
4018 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4021 for (_key, locators) in map.iter() {
4022 for loc in locators {
4023 if let RowLocator::Cold { segment_id, .. } = loc {
4024 referenced_ids.insert(*segment_id);
4025 }
4026 }
4027 }
4028 let candidate_set: BTreeSet<u32> = referenced_ids
4030 .into_iter()
4031 .filter(|id| {
4032 self.cold_segments
4033 .get(*id as usize)
4034 .and_then(|s| s.as_deref())
4035 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4036 })
4037 .collect();
4038 if candidate_set.len() < 2 {
4039 return Ok(CompactReport {
4040 sources: Vec::new(),
4041 merged_segment_id: None,
4042 merged_segment_bytes: Vec::new(),
4043 merged_rows: 0,
4044 deleted_rows_pruned: 0,
4045 bytes_reclaimed_estimate: 0,
4046 });
4047 }
4048 let mut source_row_count: usize = 0;
4050 let mut source_byte_total: u64 = 0;
4051 for &id in &candidate_set {
4052 let seg = self.cold_segments[id as usize]
4053 .as_ref()
4054 .expect("candidate selected only when slot is Some");
4055 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4056 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4057 }
4058 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4064 for (key, locators) in map.iter() {
4065 for loc in locators {
4066 let RowLocator::Cold { segment_id, .. } = loc else {
4067 continue;
4068 };
4069 if !candidate_set.contains(segment_id) {
4070 continue;
4071 }
4072 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4073 StorageError::Corrupt(format!(
4074 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4075 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4076 ))
4077 })?;
4078 let seg = self.cold_segments[*segment_id as usize]
4079 .as_ref()
4080 .expect("candidate slot guaranteed Some above");
4081 let payload = seg.lookup(u64_key).ok_or_else(|| {
4082 StorageError::Corrupt(format!(
4083 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4084 at segment {segment_id} but the segment lookup missed"
4085 ))
4086 })?;
4087 collected.insert(u64_key, (payload, key.clone()));
4088 break;
4089 }
4090 }
4091 let merged_rows = collected.len();
4092 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4093
4094 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4098 .iter()
4099 .map(|(k, (body, _))| (*k, body.clone()))
4100 .collect();
4101 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4102 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4103 let merged_bytes_len = seg_bytes.len() as u64;
4104
4105 let merged_segment_id = self
4107 .load_segment_bytes(seg_bytes.clone())
4108 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4109
4110 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4116 let t = self
4117 .get(table_name)
4118 .expect("table existed at the start of this fn");
4119 let idx = t
4120 .indices
4121 .iter()
4122 .find(|i| i.name == index_name)
4123 .expect("index existed at the start of this fn");
4124 let IndexKind::BTree(map) = &idx.kind else {
4125 unreachable!("validated above");
4126 };
4127 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4128 };
4129 let t_mut = self
4130 .get_mut(table_name)
4131 .expect("table existed at the start of this fn");
4132 let idx_mut = t_mut
4133 .indices
4134 .iter_mut()
4135 .find(|i| i.name == index_name)
4136 .expect("index existed at the start of this fn");
4137 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4138 unreachable!("validated above");
4139 };
4140 for (key, locators) in entries {
4141 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4142 let mut changed = false;
4143 for loc in &locators {
4144 match *loc {
4145 RowLocator::Cold {
4146 segment_id,
4147 page_offset: _,
4148 } if candidate_set.contains(&segment_id) => {
4149 let replacement = RowLocator::Cold {
4150 segment_id: merged_segment_id,
4151 page_offset: 0,
4152 };
4153 if !new_locs.contains(&replacement) {
4154 new_locs.push(replacement);
4155 }
4156 changed = true;
4157 }
4158 other => new_locs.push(other),
4159 }
4160 }
4161 if changed {
4162 map_mut.insert_mut(key, new_locs);
4163 }
4164 }
4165
4166 for &id in &candidate_set {
4171 self.tombstone_segment(id)?;
4172 }
4173
4174 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4175 Ok(CompactReport {
4176 sources: candidate_set.into_iter().collect(),
4177 merged_segment_id: Some(merged_segment_id),
4178 merged_segment_bytes: seg_bytes,
4179 merged_rows,
4180 deleted_rows_pruned,
4181 bytes_reclaimed_estimate,
4182 })
4183 }
4184
4185 fn find_cold_locator(
4191 &self,
4192 table_name: &str,
4193 index_name: &str,
4194 key: &IndexKey,
4195 ) -> Result<Option<(u32, u32)>, StorageError> {
4196 let t = self.get(table_name).ok_or_else(|| {
4197 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4198 })?;
4199 let idx = t
4200 .indices
4201 .iter()
4202 .find(|i| i.name == index_name)
4203 .ok_or_else(|| {
4204 StorageError::Corrupt(format!(
4205 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4206 ))
4207 })?;
4208 if !matches!(idx.kind, IndexKind::BTree(_)) {
4209 return Err(StorageError::Corrupt(format!(
4210 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4211 )));
4212 }
4213 for loc in idx.lookup_eq(key) {
4214 if let RowLocator::Cold {
4215 segment_id,
4216 page_offset,
4217 } = *loc
4218 {
4219 return Ok(Some((segment_id, page_offset)));
4220 }
4221 }
4222 Ok(None)
4223 }
4224}
4225
4226fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4232 match key {
4233 IndexKey::Int(n) => Some(n.cast_unsigned()),
4239 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4240 }
4241}
4242
4243#[derive(Debug, Clone, PartialEq, Eq)]
4244#[non_exhaustive]
4245pub enum StorageError {
4246 DuplicateTable {
4247 name: String,
4248 },
4249 TableNotFound {
4250 name: String,
4251 },
4252 ArityMismatch {
4253 expected: usize,
4254 actual: usize,
4255 },
4256 TypeMismatch {
4257 column: String,
4258 expected: DataType,
4259 actual: DataType,
4260 position: usize,
4261 },
4262 NullInNotNull {
4263 column: String,
4264 },
4265 DuplicateIndex {
4267 name: String,
4268 },
4269 ColumnNotFound {
4271 column: String,
4272 },
4273 Corrupt(String),
4276 IndexNotFound {
4279 name: String,
4280 },
4281 Unsupported(String),
4285}
4286
4287impl fmt::Display for StorageError {
4288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4289 match self {
4290 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4291 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4292 Self::ArityMismatch { expected, actual } => write!(
4293 f,
4294 "row arity mismatch: expected {expected} columns, got {actual}"
4295 ),
4296 Self::TypeMismatch {
4297 column,
4298 expected,
4299 actual,
4300 position,
4301 } => write!(
4302 f,
4303 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4304 ),
4305 Self::NullInNotNull { column } => {
4306 write!(f, "NULL value in NOT NULL column {column:?}")
4307 }
4308 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4309 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4310 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4311 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4312 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4313 }
4314 }
4315}
4316
4317impl ColumnSchema {
4318 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4319 Self {
4320 name: name.into(),
4321 ty,
4322 nullable,
4323 default: None,
4324 runtime_default: None,
4325 auto_increment: false,
4326 }
4327 }
4328
4329 #[must_use]
4333 pub fn with_default(mut self, default: Value) -> Self {
4334 self.default = Some(default);
4335 self
4336 }
4337
4338 #[must_use]
4343 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4344 self.runtime_default = Some(expr.into());
4345 self
4346 }
4347
4348 #[must_use]
4350 pub const fn with_auto_increment(mut self) -> Self {
4351 self.auto_increment = true;
4352 self
4353 }
4354}
4355
4356impl TableSchema {
4357 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4358 Self {
4359 name: name.into(),
4360 columns,
4361 hot_tier_bytes: None,
4362 foreign_keys: Vec::new(),
4363 uniqueness_constraints: Vec::new(),
4364 checks: Vec::new(),
4365 }
4366 }
4367}
4368
4369const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4417const FILE_VERSION: u8 = 23;
4454const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4457
4458const INDEX_KEY_TAG_INT: u8 = 0;
4463const INDEX_KEY_TAG_TEXT: u8 = 1;
4464const INDEX_KEY_TAG_BOOL: u8 = 2;
4465
4466impl Catalog {
4467 pub fn serialize(&self) -> Vec<u8> {
4470 let mut out = Vec::with_capacity(64);
4471 out.extend_from_slice(FILE_MAGIC);
4472 out.push(FILE_VERSION);
4473 write_u32(
4474 &mut out,
4475 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4476 );
4477 for t in &self.tables {
4478 write_str(&mut out, &t.schema.name);
4479 write_u16(
4480 &mut out,
4481 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4482 );
4483 for c in &t.schema.columns {
4484 write_str(&mut out, &c.name);
4485 write_data_type(&mut out, c.ty);
4486 out.push(u8::from(c.nullable));
4487 match &c.default {
4488 None => out.push(0),
4489 Some(v) => {
4490 out.push(1);
4491 write_value(&mut out, v);
4492 }
4493 }
4494 out.push(u8::from(c.auto_increment));
4495 }
4496 write_u32(
4497 &mut out,
4498 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4499 );
4500 for row in &t.rows {
4505 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4506 }
4507 write_u16(
4514 &mut out,
4515 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4516 );
4517 for idx in &t.indices {
4518 write_str(&mut out, &idx.name);
4519 write_u16(
4520 &mut out,
4521 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4522 );
4523 match &idx.kind {
4524 IndexKind::BTree(map) => {
4525 out.push(0);
4526 write_u32(
4534 &mut out,
4535 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4536 );
4537 for (key, locators) in map {
4538 write_index_key(&mut out, key);
4539 write_u32(
4540 &mut out,
4541 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4542 );
4543 for loc in locators {
4544 loc.write_le(&mut out);
4545 }
4546 }
4547 }
4548 IndexKind::Nsw(g) => {
4549 out.push(1);
4550 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4551 write_nsw_graph(&mut out, g);
4552 }
4553 IndexKind::Brin { column_type } => {
4554 out.push(2);
4560 write_data_type(&mut out, *column_type);
4561 }
4562 IndexKind::Gin(map) => {
4563 out.push(3);
4572 write_u32(
4573 &mut out,
4574 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4575 );
4576 for (word, locators) in map {
4577 write_str(&mut out, word);
4578 write_u32(
4579 &mut out,
4580 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4581 );
4582 for loc in locators {
4583 loc.write_le(&mut out);
4584 }
4585 }
4586 }
4587 }
4588 write_u16(
4594 &mut out,
4595 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4596 );
4597 for col_pos in &idx.included_columns {
4598 write_u16(
4599 &mut out,
4600 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4601 );
4602 }
4603 match &idx.partial_predicate {
4607 None => out.push(0),
4608 Some(pred) => {
4609 out.push(1);
4610 write_str(&mut out, pred);
4611 }
4612 }
4613 match &idx.expression {
4616 None => out.push(0),
4617 Some(expr) => {
4618 out.push(1);
4619 write_str(&mut out, expr);
4620 }
4621 }
4622 out.push(u8::from(idx.is_unique));
4626 write_u16(
4629 &mut out,
4630 u16::try_from(idx.extra_column_positions.len())
4631 .expect("≤ 65k extra cols / index"),
4632 );
4633 for cp in &idx.extra_column_positions {
4634 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4635 }
4636 }
4637 match t.schema.hot_tier_bytes {
4643 None => out.push(0),
4644 Some(n) => {
4645 out.push(1);
4646 out.extend_from_slice(&n.to_le_bytes());
4647 }
4648 }
4649 write_u16(
4660 &mut out,
4661 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4662 );
4663 for fk in &t.schema.foreign_keys {
4664 match &fk.name {
4665 None => out.push(0),
4666 Some(n) => {
4667 out.push(1);
4668 write_str(&mut out, n);
4669 }
4670 }
4671 write_u16(
4672 &mut out,
4673 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4674 );
4675 for &p in &fk.local_columns {
4676 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4677 }
4678 write_str(&mut out, &fk.parent_table);
4679 write_u16(
4680 &mut out,
4681 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4682 );
4683 for &p in &fk.parent_columns {
4684 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4685 }
4686 out.push(fk.on_delete.tag());
4687 out.push(fk.on_update.tag());
4688 }
4689 write_u16(
4698 &mut out,
4699 u16::try_from(t.schema.uniqueness_constraints.len())
4700 .expect("≤ 65k uniqueness constraints/table"),
4701 );
4702 for uc in &t.schema.uniqueness_constraints {
4703 out.push(u8::from(uc.is_primary_key));
4704 write_u16(
4705 &mut out,
4706 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4707 );
4708 for &p in &uc.columns {
4709 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4710 }
4711 out.push(u8::from(uc.nulls_not_distinct));
4716 }
4717 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4724 for (i, c) in t.schema.columns.iter().enumerate() {
4725 if let Some(e) = &c.runtime_default {
4726 rt_defaults.push((i, e.as_str()));
4727 }
4728 }
4729 write_u16(
4730 &mut out,
4731 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4732 );
4733 for (pos, expr) in rt_defaults {
4734 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4735 write_str(&mut out, expr);
4736 }
4737 write_u16(
4744 &mut out,
4745 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
4746 );
4747 for c in &t.schema.checks {
4748 write_str(&mut out, c.as_str());
4749 }
4750 }
4751 write_u32(
4764 &mut out,
4765 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
4766 );
4767 for fd in self.functions.values() {
4768 write_str(&mut out, &fd.name);
4769 write_str(&mut out, &fd.args_repr);
4770 write_str(&mut out, &fd.returns);
4771 write_str(&mut out, &fd.language);
4772 write_str_long(&mut out, &fd.body);
4773 }
4774 write_u32(
4775 &mut out,
4776 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
4777 );
4778 for td in &self.triggers {
4779 write_str(&mut out, &td.name);
4780 write_str(&mut out, &td.table);
4781 write_str(&mut out, &td.timing);
4782 write_u16(
4783 &mut out,
4784 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
4785 );
4786 for ev in &td.events {
4787 write_str(&mut out, ev);
4788 }
4789 write_str(&mut out, &td.for_each);
4790 write_str(&mut out, &td.function);
4791 write_u16(
4795 &mut out,
4796 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
4797 );
4798 for c in &td.update_columns {
4799 write_str(&mut out, c);
4800 }
4801 }
4802 out
4803 }
4804
4805 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4808 let mut cur = Cursor::new(buf);
4809 let magic = cur.take(8)?;
4810 if magic != FILE_MAGIC {
4811 return Err(StorageError::Corrupt(format!(
4812 "bad magic: expected SPGDB001, got {magic:?}"
4813 )));
4814 }
4815 let version = cur.read_u8()?;
4816 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4817 return Err(StorageError::Corrupt(format!(
4818 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4819 )));
4820 }
4821 let table_count = cur.read_u32()? as usize;
4822 let mut cat = Self::new();
4823 for _ in 0..table_count {
4824 deserialize_table(&mut cur, &mut cat, version)?;
4825 }
4826 if version >= 22 {
4830 let fn_count = cur.read_u32()? as usize;
4831 for _ in 0..fn_count {
4832 let name = cur.read_str()?;
4833 let args_repr = cur.read_str()?;
4834 let returns = cur.read_str()?;
4835 let language = cur.read_str()?;
4836 let body = cur.read_str_long()?;
4837 cat.functions.insert(
4838 name.clone(),
4839 FunctionDef {
4840 name,
4841 args_repr,
4842 returns,
4843 language,
4844 body,
4845 },
4846 );
4847 }
4848 let trg_count = cur.read_u32()? as usize;
4849 for _ in 0..trg_count {
4850 let name = cur.read_str()?;
4851 let table = cur.read_str()?;
4852 let timing = cur.read_str()?;
4853 let ev_count = cur.read_u16()? as usize;
4854 let mut events = Vec::with_capacity(ev_count);
4855 for _ in 0..ev_count {
4856 events.push(cur.read_str()?);
4857 }
4858 let for_each = cur.read_str()?;
4859 let function = cur.read_str()?;
4860 let update_columns = if version >= 23 {
4864 let n = cur.read_u16()? as usize;
4865 let mut cols = Vec::with_capacity(n);
4866 for _ in 0..n {
4867 cols.push(cur.read_str()?);
4868 }
4869 cols
4870 } else {
4871 Vec::new()
4872 };
4873 cat.triggers.push(TriggerDef {
4874 name,
4875 table,
4876 timing,
4877 events,
4878 for_each,
4879 function,
4880 update_columns,
4881 });
4882 }
4883 }
4884 if cur.pos < buf.len() {
4885 return Err(StorageError::Corrupt(format!(
4886 "trailing bytes: {} unread",
4887 buf.len() - cur.pos
4888 )));
4889 }
4890 Ok(cat)
4891 }
4892}
4893
4894fn deserialize_table(
4899 cur: &mut Cursor<'_>,
4900 cat: &mut Catalog,
4901 version: u8,
4902) -> Result<(), StorageError> {
4903 let table_name = cur.read_str()?;
4904 let name = table_name.clone();
4905 let col_count = cur.read_u16()? as usize;
4906 let mut cols = Vec::with_capacity(col_count);
4907 for _ in 0..col_count {
4908 let c_name = cur.read_str()?;
4909 let ty = cur.read_data_type()?;
4910 let nullable = cur.read_u8()? != 0;
4911 let default = match cur.read_u8()? {
4912 0 => None,
4913 1 => Some(cur.read_value()?),
4914 other => {
4915 return Err(StorageError::Corrupt(format!(
4916 "unknown default tag: {other}"
4917 )));
4918 }
4919 };
4920 let auto_increment = cur.read_u8()? != 0;
4921 cols.push(ColumnSchema {
4925 name: c_name,
4926 ty,
4927 nullable,
4928 default,
4929 runtime_default: None,
4930 auto_increment,
4931 });
4932 }
4933 let n_cols = cols.len();
4934 cat.create_table(TableSchema::new(name, cols))?;
4935 let t = cat.tables.last_mut().expect("create_table just pushed");
4939 deserialize_rows(cur, t, n_cols)?;
4940 deserialize_indices(cur, t, version)?;
4941 if version >= 11 {
4947 let has = cur.read_u8()?;
4948 let hot_tier_bytes = match has {
4949 0 => None,
4950 1 => Some(cur.read_u64()?),
4951 other => {
4952 return Err(StorageError::Corrupt(format!(
4953 "hot_tier_bytes appendix: unknown has-value byte {other}"
4954 )));
4955 }
4956 };
4957 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4958 }
4959 if version >= 13 {
4962 let fk_count = cur.read_u16()? as usize;
4963 let mut fks = Vec::with_capacity(fk_count);
4964 for _ in 0..fk_count {
4965 let name = match cur.read_u8()? {
4966 0 => None,
4967 1 => Some(cur.read_str()?),
4968 other => {
4969 return Err(StorageError::Corrupt(format!(
4970 "FK appendix: unknown has-name byte {other}"
4971 )));
4972 }
4973 };
4974 let local_arity = cur.read_u16()? as usize;
4975 let mut local_columns = Vec::with_capacity(local_arity);
4976 for _ in 0..local_arity {
4977 local_columns.push(cur.read_u16()? as usize);
4978 }
4979 let parent_table = cur.read_str()?;
4980 let parent_arity = cur.read_u16()? as usize;
4981 if parent_arity != local_arity {
4982 return Err(StorageError::Corrupt(format!(
4983 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4984 )));
4985 }
4986 let mut parent_columns = Vec::with_capacity(parent_arity);
4987 for _ in 0..parent_arity {
4988 parent_columns.push(cur.read_u16()? as usize);
4989 }
4990 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4991 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4992 })?;
4993 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4994 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4995 })?;
4996 fks.push(ForeignKeyConstraint {
4997 name,
4998 local_columns,
4999 parent_table,
5000 parent_columns,
5001 on_delete,
5002 on_update,
5003 });
5004 }
5005 t.schema_mut().foreign_keys = fks;
5006 }
5007 if version >= 15 {
5010 let uc_count = cur.read_u16()? as usize;
5011 let mut ucs = Vec::with_capacity(uc_count);
5012 for _ in 0..uc_count {
5013 let is_pk = cur.read_u8()? != 0;
5014 let arity = cur.read_u16()? as usize;
5015 let mut cols = Vec::with_capacity(arity);
5016 for _ in 0..arity {
5017 cols.push(cur.read_u16()? as usize);
5018 }
5019 let nulls_not_distinct = if version >= 23 {
5023 cur.read_u8()? != 0
5024 } else {
5025 false
5026 };
5027 ucs.push(UniquenessConstraint {
5028 is_primary_key: is_pk,
5029 columns: cols,
5030 nulls_not_distinct,
5031 });
5032 }
5033 t.schema_mut().uniqueness_constraints = ucs;
5034 let rt_count = cur.read_u16()? as usize;
5036 for _ in 0..rt_count {
5037 let pos = cur.read_u16()? as usize;
5038 let expr = cur.read_str()?;
5039 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5040 col.runtime_default = Some(expr);
5041 }
5042 }
5043 }
5044 if version >= 23 {
5047 let check_count = cur.read_u16()? as usize;
5048 let mut checks = Vec::with_capacity(check_count);
5049 for _ in 0..check_count {
5050 checks.push(cur.read_str()?);
5051 }
5052 t.schema_mut().checks = checks;
5053 }
5054 let _ = table_name;
5055 Ok(())
5056}
5057
5058fn deserialize_rows(
5059 cur: &mut Cursor<'_>,
5060 t: &mut Table,
5061 _n_cols: usize,
5062) -> Result<(), StorageError> {
5063 let row_count = cur.read_u32()? as usize;
5064 let mut hot_bytes: u64 = 0;
5069 for _ in 0..row_count {
5070 let tail = &cur.buf[cur.pos..];
5071 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5072 cur.pos += consumed;
5073 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5079 t.rows.push_mut(row);
5080 }
5081 t.hot_bytes = hot_bytes;
5082 Ok(())
5083}
5084
5085fn deserialize_indices(
5086 cur: &mut Cursor<'_>,
5087 t: &mut Table,
5088 version: u8,
5089) -> Result<(), StorageError> {
5090 let index_count = cur.read_u16()? as usize;
5091 for _ in 0..index_count {
5092 let idx_name = cur.read_str()?;
5093 let col_pos = cur.read_u16()? as usize;
5094 let column_name = t
5095 .schema
5096 .columns
5097 .get(col_pos)
5098 .ok_or_else(|| {
5099 StorageError::Corrupt(format!(
5100 "index {idx_name:?} points at non-existent column position {col_pos}"
5101 ))
5102 })?
5103 .name
5104 .clone();
5105 let kind_tag = cur.read_u8()?;
5106 match kind_tag {
5107 0 => {
5108 if version >= 9 {
5109 let map = read_btree_map(cur)?;
5114 t.restore_btree_index(idx_name, &column_name, map)?;
5115 } else {
5116 t.add_index(idx_name, &column_name)?;
5121 }
5122 }
5123 1 => {
5124 let m = cur.read_u16()? as usize;
5125 let graph = cur.read_nsw_graph(m)?;
5126 t.restore_nsw_index(idx_name, &column_name, graph)?;
5127 }
5128 2 => {
5129 let column_type = cur.read_data_type()?;
5133 t.restore_brin_index(idx_name, &column_name, column_type)?;
5134 }
5135 3 => {
5136 let map = read_gin_map(cur)?;
5141 t.restore_gin_index(idx_name, &column_name, map)?;
5142 }
5143 other => {
5144 return Err(StorageError::Corrupt(format!(
5145 "unknown index kind tag: {other}"
5146 )));
5147 }
5148 }
5149 if version >= 12 {
5152 let num_included = cur.read_u16()? as usize;
5153 if num_included > 0 {
5154 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5155 for _ in 0..num_included {
5156 let cp = cur.read_u16()? as usize;
5157 if cp >= t.schema.columns.len() {
5158 return Err(StorageError::Corrupt(format!(
5159 "INCLUDE column position {cp} out of range \
5160 ({} schema columns)",
5161 t.schema.columns.len()
5162 )));
5163 }
5164 included.push(cp);
5165 }
5166 if let Some(last) = t.indices.last_mut() {
5167 last.included_columns = included;
5168 }
5169 }
5170 match cur.read_u8()? {
5172 0 => {}
5173 1 => {
5174 let pred = cur.read_str()?;
5175 if let Some(last) = t.indices.last_mut() {
5176 last.partial_predicate = Some(pred);
5177 }
5178 }
5179 other => {
5180 return Err(StorageError::Corrupt(format!(
5181 "partial_predicate tag: unknown byte {other}"
5182 )));
5183 }
5184 }
5185 match cur.read_u8()? {
5187 0 => {}
5188 1 => {
5189 let expr = cur.read_str()?;
5190 if let Some(last) = t.indices.last_mut() {
5191 last.expression = Some(expr);
5192 }
5193 }
5194 other => {
5195 return Err(StorageError::Corrupt(format!(
5196 "expression tag: unknown byte {other}"
5197 )));
5198 }
5199 }
5200 if version >= 16 {
5203 match cur.read_u8()? {
5204 0 => {}
5205 1 => {
5206 if let Some(last) = t.indices.last_mut() {
5207 last.is_unique = true;
5208 }
5209 }
5210 other => {
5211 return Err(StorageError::Corrupt(format!(
5212 "is_unique tag: unknown byte {other}"
5213 )));
5214 }
5215 }
5216 let n = cur.read_u16()? as usize;
5218 if n > 0 {
5219 let mut extras: Vec<usize> = Vec::with_capacity(n);
5220 for _ in 0..n {
5221 let cp = cur.read_u16()? as usize;
5222 if cp >= t.schema.columns.len() {
5223 return Err(StorageError::Corrupt(format!(
5224 "extra column position {cp} out of range \
5225 ({} schema columns)",
5226 t.schema.columns.len()
5227 )));
5228 }
5229 extras.push(cp);
5230 }
5231 if let Some(last) = t.indices.last_mut() {
5232 last.extra_column_positions = extras;
5233 }
5234 }
5235 }
5236 }
5237 }
5238 Ok(())
5239}
5240
5241fn read_btree_map(
5245 cur: &mut Cursor<'_>,
5246) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5247 let entry_count = cur.read_u32()? as usize;
5248 let mut map = PersistentBTreeMap::new();
5249 for _ in 0..entry_count {
5250 let key = cur.read_index_key()?;
5251 let locator_count = cur.read_u32()? as usize;
5252 let mut locators = Vec::with_capacity(locator_count);
5253 for _ in 0..locator_count {
5254 let tail = &cur.buf[cur.pos..];
5255 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5256 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5257 })?;
5258 cur.pos += consumed;
5259 locators.push(loc);
5260 }
5261 map.insert_mut(key, locators);
5262 }
5263 Ok(map)
5264}
5265
5266fn read_gin_map(
5270 cur: &mut Cursor<'_>,
5271) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5272 let entry_count = cur.read_u32()? as usize;
5273 let mut map = PersistentBTreeMap::new();
5274 for _ in 0..entry_count {
5275 let word = cur.read_str()?;
5276 let locator_count = cur.read_u32()? as usize;
5277 let mut locators = Vec::with_capacity(locator_count);
5278 for _ in 0..locator_count {
5279 let tail = &cur.buf[cur.pos..];
5280 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5281 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5282 })?;
5283 cur.pos += consumed;
5284 locators.push(loc);
5285 }
5286 map.insert_mut(word, locators);
5287 }
5288 Ok(map)
5289}
5290
5291fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5307 let entry = g.entry.map_or(u32::MAX, |e| {
5308 u32::try_from(e).expect("NSW entry fits in u32")
5309 });
5310 write_u16(
5311 out,
5312 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5313 );
5314 out.extend_from_slice(&entry.to_le_bytes());
5315 out.push(g.entry_level);
5316 let node_count = g.levels.len();
5317 write_u32(
5318 out,
5319 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5320 );
5321 for &lvl in &g.levels {
5322 out.push(lvl);
5323 }
5324 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5325 out.push(layer_count);
5326 for layer in &g.layers {
5327 write_u32(
5328 out,
5329 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5330 );
5331 for neighbors in layer {
5332 write_u16(
5333 out,
5334 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5335 );
5336 for &peer in neighbors {
5340 write_u32(out, peer);
5341 }
5342 }
5343 }
5344}
5345
5346fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5347 match t {
5348 DataType::Int => out.push(1),
5349 DataType::BigInt => out.push(2),
5350 DataType::Float => out.push(3),
5351 DataType::Text => out.push(4),
5352 DataType::Bool => out.push(5),
5353 DataType::Vector { dim, encoding } => match encoding {
5354 VecEncoding::F32 => {
5358 out.push(6);
5359 out.extend_from_slice(&dim.to_le_bytes());
5360 }
5361 VecEncoding::F16 => {
5364 out.push(15);
5365 out.extend_from_slice(&dim.to_le_bytes());
5366 }
5367 VecEncoding::Sq8 => {
5373 out.push(14);
5374 out.extend_from_slice(&dim.to_le_bytes());
5375 }
5376 },
5377 DataType::SmallInt => out.push(7),
5378 DataType::Varchar(max) => {
5379 out.push(8);
5380 out.extend_from_slice(&max.to_le_bytes());
5381 }
5382 DataType::Char(size) => {
5383 out.push(9);
5384 out.extend_from_slice(&size.to_le_bytes());
5385 }
5386 DataType::Numeric { precision, scale } => {
5387 out.push(10);
5388 out.push(precision);
5389 out.push(scale);
5390 }
5391 DataType::Date => out.push(11),
5392 DataType::Timestamp => out.push(12),
5393 DataType::Timestamptz => out.push(17),
5397 DataType::Interval => {
5402 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5403 }
5404 DataType::Json => out.push(13),
5405 DataType::Jsonb => out.push(16),
5408 DataType::Bytes => out.push(18),
5410 DataType::TextArray => out.push(19),
5413 DataType::IntArray => out.push(20),
5416 DataType::BigIntArray => out.push(21),
5419 DataType::TsVector => out.push(22),
5422 DataType::TsQuery => out.push(23),
5425 }
5426}
5427
5428impl Cursor<'_> {
5429 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5430 let tag = self.read_u8()?;
5431 match tag {
5432 1 => Ok(DataType::Int),
5433 2 => Ok(DataType::BigInt),
5434 3 => Ok(DataType::Float),
5435 4 => Ok(DataType::Text),
5436 5 => Ok(DataType::Bool),
5437 6 => Ok(DataType::Vector {
5438 dim: self.read_u32()?,
5439 encoding: VecEncoding::F32,
5440 }),
5441 7 => Ok(DataType::SmallInt),
5442 8 => Ok(DataType::Varchar(self.read_u32()?)),
5443 9 => Ok(DataType::Char(self.read_u32()?)),
5444 10 => {
5445 let precision = self.read_u8()?;
5446 let scale = self.read_u8()?;
5447 Ok(DataType::Numeric { precision, scale })
5448 }
5449 11 => Ok(DataType::Date),
5450 12 => Ok(DataType::Timestamp),
5451 13 => Ok(DataType::Json),
5452 14 => Ok(DataType::Vector {
5453 dim: self.read_u32()?,
5454 encoding: VecEncoding::Sq8,
5455 }),
5456 15 => Ok(DataType::Vector {
5460 dim: self.read_u32()?,
5461 encoding: VecEncoding::F16,
5462 }),
5463 16 => Ok(DataType::Jsonb),
5467 17 => Ok(DataType::Timestamptz),
5471 18 => Ok(DataType::Bytes),
5473 19 => Ok(DataType::TextArray),
5475 20 => Ok(DataType::IntArray),
5477 21 => Ok(DataType::BigIntArray),
5478 22 => Ok(DataType::TsVector),
5481 23 => Ok(DataType::TsQuery),
5482 other => Err(StorageError::Corrupt(format!(
5483 "unknown data type tag: {other}"
5484 ))),
5485 }
5486 }
5487}
5488
5489pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5495 debug_assert_eq!(
5496 row.values.len(),
5497 schema.columns.len(),
5498 "row_body_encoded_len: row arity must match schema"
5499 );
5500 let bitmap_bytes = schema.columns.len().div_ceil(8);
5501 let mut n = bitmap_bytes;
5502 for (col_idx, v) in row.values.iter().enumerate() {
5503 if matches!(v, Value::Null) {
5504 continue;
5505 }
5506 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5507 }
5508 n
5509}
5510
5511fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5517 match v {
5518 Value::SmallInt(_) => 2,
5519 Value::Int(_) | Value::Date(_) => 4,
5521 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5523 Value::Bool(_) => 1,
5524 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5526 Value::Vector(vec) => 4 + 4 * vec.len(),
5528 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5535 Value::HalfVector(h) => 4 + h.bytes.len(),
5538 Value::Numeric { .. } => 16 + 1,
5540 Value::Bytes(b) => 2 + b.len(),
5546 Value::TextArray(items) => {
5549 let mut n = 2; for item in items {
5551 n += 1; if let Some(s) = item {
5553 n += 2 + s.len();
5554 }
5555 }
5556 n
5557 }
5558 Value::IntArray(items) => {
5561 2 + items
5562 .iter()
5563 .map(|x| if x.is_some() { 5 } else { 1 })
5564 .sum::<usize>()
5565 }
5566 Value::BigIntArray(items) => {
5567 2 + items
5568 .iter()
5569 .map(|x| if x.is_some() { 9 } else { 1 })
5570 .sum::<usize>()
5571 }
5572 Value::TsVector(lexs) => {
5576 let mut n = 2;
5577 for l in lexs {
5578 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5579 }
5580 n
5581 }
5582 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5585 Value::Null => 0,
5587 Value::Interval { .. } => {
5589 unreachable!("Value::Interval has no on-disk encoding")
5590 }
5591 }
5592}
5593
5594pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5605 debug_assert_eq!(
5606 row.values.len(),
5607 schema.columns.len(),
5608 "dense encode: row arity must match schema"
5609 );
5610 let bitmap_bytes = schema.columns.len().div_ceil(8);
5611 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5614 let bitmap_offset = out.len();
5615 out.resize(bitmap_offset + bitmap_bytes, 0);
5616 for (i, v) in row.values.iter().enumerate() {
5617 if matches!(v, Value::Null) {
5618 out[bitmap_offset + i / 8] |= 1 << (i % 8);
5619 }
5620 }
5621 for (col_idx, v) in row.values.iter().enumerate() {
5622 if matches!(v, Value::Null) {
5623 continue;
5624 }
5625 write_value_body(&mut out, v, schema.columns[col_idx].ty);
5626 }
5627 out
5628}
5629
5630pub fn decode_row_body_dense(
5636 bytes: &[u8],
5637 schema: &TableSchema,
5638) -> Result<(Row, usize), StorageError> {
5639 let mut cur = Cursor::new(bytes);
5640 let bitmap_bytes = schema.columns.len().div_ceil(8);
5641 let mut bitmap_buf = [0u8; 32];
5642 if bitmap_bytes > bitmap_buf.len() {
5643 return Err(StorageError::Corrupt(format!(
5644 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
5645 )));
5646 }
5647 let slice = cur.take(bitmap_bytes)?;
5648 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
5649 let mut values = Vec::with_capacity(schema.columns.len());
5650 for (col_idx, col) in schema.columns.iter().enumerate() {
5651 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
5652 values.push(Value::Null);
5653 } else {
5654 values.push(cur.read_value_body(col.ty)?);
5655 }
5656 }
5657 Ok((Row { values }, cur.pos))
5658}
5659
5660fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
5669 match (v, ty) {
5670 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
5671 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
5672 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
5673 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
5674 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
5675 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
5676 write_str(out, s);
5677 }
5678 (
5679 Value::Vector(v),
5680 DataType::Vector {
5681 encoding: VecEncoding::F32,
5682 ..
5683 },
5684 ) => {
5685 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5686 out.extend_from_slice(&dim.to_le_bytes());
5687 for x in v {
5688 out.extend_from_slice(&x.to_le_bytes());
5689 }
5690 }
5691 (
5697 Value::Sq8Vector(q),
5698 DataType::Vector {
5699 encoding: VecEncoding::Sq8,
5700 ..
5701 },
5702 ) => {
5703 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5704 out.extend_from_slice(&dim.to_le_bytes());
5705 out.extend_from_slice(&q.min.to_le_bytes());
5706 out.extend_from_slice(&q.max.to_le_bytes());
5707 out.extend_from_slice(&q.bytes);
5708 }
5709 (
5713 Value::HalfVector(h),
5714 DataType::Vector {
5715 encoding: VecEncoding::F16,
5716 ..
5717 },
5718 ) => {
5719 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5720 out.extend_from_slice(&dim.to_le_bytes());
5721 out.extend_from_slice(&h.bytes);
5722 }
5723 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
5724 out.extend_from_slice(&scaled.to_le_bytes());
5725 out.push(scale);
5726 }
5727 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
5728 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
5729 out.extend_from_slice(&t.to_le_bytes())
5730 }
5731 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5735 (Value::Bytes(b), DataType::Bytes) => {
5738 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5739 out.extend_from_slice(&len.to_le_bytes());
5740 out.extend_from_slice(b);
5741 }
5742 (Value::TextArray(items), DataType::TextArray) => {
5745 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5746 out.extend_from_slice(&count.to_le_bytes());
5747 for item in items {
5748 match item {
5749 None => out.push(1),
5750 Some(s) => {
5751 out.push(0);
5752 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5753 out.extend_from_slice(&len.to_le_bytes());
5754 out.extend_from_slice(s.as_bytes());
5755 }
5756 }
5757 }
5758 }
5759 (Value::IntArray(items), DataType::IntArray) => {
5762 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5763 out.extend_from_slice(&count.to_le_bytes());
5764 for item in items {
5765 match item {
5766 None => out.push(1),
5767 Some(n) => {
5768 out.push(0);
5769 out.extend_from_slice(&n.to_le_bytes());
5770 }
5771 }
5772 }
5773 }
5774 (Value::BigIntArray(items), DataType::BigIntArray) => {
5777 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5778 out.extend_from_slice(&count.to_le_bytes());
5779 for item in items {
5780 match item {
5781 None => out.push(1),
5782 Some(n) => {
5783 out.push(0);
5784 out.extend_from_slice(&n.to_le_bytes());
5785 }
5786 }
5787 }
5788 }
5789 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
5792 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
5794 (other, ty) => unreachable!(
5798 "schema-driven encode received mismatched value/type pair: \
5799 value tag={:?}, column type={:?}",
5800 other.data_type(),
5801 ty
5802 ),
5803 }
5804}
5805
5806fn write_value(out: &mut Vec<u8>, v: &Value) {
5807 match v {
5808 Value::Null => out.push(0),
5809 Value::SmallInt(n) => {
5810 out.push(7);
5811 out.extend_from_slice(&n.to_le_bytes());
5812 }
5813 Value::Int(n) => {
5814 out.push(1);
5815 out.extend_from_slice(&n.to_le_bytes());
5816 }
5817 Value::BigInt(n) => {
5818 out.push(2);
5819 out.extend_from_slice(&n.to_le_bytes());
5820 }
5821 Value::Float(x) => {
5822 out.push(3);
5823 out.extend_from_slice(&x.to_le_bytes());
5824 }
5825 Value::Text(s) | Value::Json(s) => {
5830 out.push(4);
5831 write_str(out, s);
5832 }
5833 Value::Bool(b) => {
5834 out.push(5);
5835 out.push(u8::from(*b));
5836 }
5837 Value::Vector(v) => {
5838 out.push(6);
5839 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5840 out.extend_from_slice(&dim.to_le_bytes());
5841 for x in v {
5842 out.extend_from_slice(&x.to_le_bytes());
5843 }
5844 }
5845 Value::Sq8Vector(q) => {
5850 out.push(11);
5851 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5852 out.extend_from_slice(&dim.to_le_bytes());
5853 out.extend_from_slice(&q.min.to_le_bytes());
5854 out.extend_from_slice(&q.max.to_le_bytes());
5855 out.extend_from_slice(&q.bytes);
5856 }
5857 Value::HalfVector(h) => {
5862 out.push(12);
5863 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5864 out.extend_from_slice(&dim.to_le_bytes());
5865 out.extend_from_slice(&h.bytes);
5866 }
5867 Value::Numeric { scaled, scale } => {
5868 out.push(8);
5869 out.extend_from_slice(&scaled.to_le_bytes());
5870 out.push(*scale);
5871 }
5872 Value::Date(d) => {
5873 out.push(9);
5874 out.extend_from_slice(&d.to_le_bytes());
5875 }
5876 Value::Timestamp(t) => {
5877 out.push(10);
5878 out.extend_from_slice(&t.to_le_bytes());
5879 }
5880 Value::Interval { .. } => {
5884 unreachable!(
5885 "Value::Interval has no on-disk encoding; engine must reject it before write"
5886 )
5887 }
5888 Value::Bytes(b) => {
5893 out.push(14);
5894 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5895 out.extend_from_slice(&len.to_le_bytes());
5896 out.extend_from_slice(b);
5897 }
5898 Value::TextArray(items) => {
5901 out.push(15);
5902 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5903 out.extend_from_slice(&count.to_le_bytes());
5904 for item in items {
5905 match item {
5906 None => out.push(1),
5907 Some(s) => {
5908 out.push(0);
5909 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5910 out.extend_from_slice(&len.to_le_bytes());
5911 out.extend_from_slice(s.as_bytes());
5912 }
5913 }
5914 }
5915 }
5916 Value::IntArray(items) => {
5919 out.push(16);
5920 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5921 out.extend_from_slice(&count.to_le_bytes());
5922 for item in items {
5923 match item {
5924 None => out.push(1),
5925 Some(n) => {
5926 out.push(0);
5927 out.extend_from_slice(&n.to_le_bytes());
5928 }
5929 }
5930 }
5931 }
5932 Value::BigIntArray(items) => {
5935 out.push(17);
5936 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5937 out.extend_from_slice(&count.to_le_bytes());
5938 for item in items {
5939 match item {
5940 None => out.push(1),
5941 Some(n) => {
5942 out.push(0);
5943 out.extend_from_slice(&n.to_le_bytes());
5944 }
5945 }
5946 }
5947 }
5948 Value::TsVector(lexs) => {
5951 out.push(18);
5952 write_tsvector_body(out, lexs);
5953 }
5954 Value::TsQuery(ast) => {
5957 out.push(19);
5958 write_tsquery_body(out, ast);
5959 }
5960 }
5961}
5962
5963fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
5966 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
5967 out.extend_from_slice(&count.to_le_bytes());
5968 for l in lexs {
5969 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
5970 out.extend_from_slice(&wlen.to_le_bytes());
5971 out.extend_from_slice(l.word.as_bytes());
5972 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
5973 out.extend_from_slice(&plen.to_le_bytes());
5974 for p in &l.positions {
5975 out.extend_from_slice(&p.to_le_bytes());
5976 }
5977 out.push(l.weight);
5978 }
5979}
5980
5981fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
5985 match ast {
5986 TsQueryAst::Term { word, weight_mask } => {
5987 out.push(0);
5988 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
5989 out.extend_from_slice(&len.to_le_bytes());
5990 out.extend_from_slice(word.as_bytes());
5991 out.push(*weight_mask);
5992 }
5993 TsQueryAst::And(a, b) => {
5994 out.push(1);
5995 write_tsquery_body(out, a);
5996 write_tsquery_body(out, b);
5997 }
5998 TsQueryAst::Or(a, b) => {
5999 out.push(2);
6000 write_tsquery_body(out, a);
6001 write_tsquery_body(out, b);
6002 }
6003 TsQueryAst::Not(x) => {
6004 out.push(3);
6005 write_tsquery_body(out, x);
6006 }
6007 TsQueryAst::Phrase {
6008 left,
6009 right,
6010 distance,
6011 } => {
6012 out.push(4);
6013 out.extend_from_slice(&distance.to_le_bytes());
6014 write_tsquery_body(out, left);
6015 write_tsquery_body(out, right);
6016 }
6017 }
6018}
6019
6020fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6022 match ast {
6023 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6024 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6025 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6026 }
6027 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6028 TsQueryAst::Phrase { left, right, .. } => {
6029 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6030 }
6031 }
6032}
6033
6034fn write_u16(out: &mut Vec<u8>, n: u16) {
6035 out.extend_from_slice(&n.to_le_bytes());
6036}
6037fn write_u32(out: &mut Vec<u8>, n: u32) {
6038 out.extend_from_slice(&n.to_le_bytes());
6039}
6040fn write_str(out: &mut Vec<u8>, s: &str) {
6041 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6042 write_u16(out, len);
6043 out.extend_from_slice(s.as_bytes());
6044}
6045
6046fn write_str_long(out: &mut Vec<u8>, s: &str) {
6051 let len = u32::try_from(s.len()).expect("function body fits in u32");
6052 write_u32(out, len);
6053 out.extend_from_slice(s.as_bytes());
6054}
6055
6056fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6060 match key {
6061 IndexKey::Int(n) => {
6062 out.push(INDEX_KEY_TAG_INT);
6063 out.extend_from_slice(&n.to_le_bytes());
6064 }
6065 IndexKey::Text(s) => {
6066 out.push(INDEX_KEY_TAG_TEXT);
6067 write_str(out, s);
6068 }
6069 IndexKey::Bool(b) => {
6070 out.push(INDEX_KEY_TAG_BOOL);
6071 out.push(u8::from(*b));
6072 }
6073 }
6074}
6075
6076struct Cursor<'a> {
6077 buf: &'a [u8],
6078 pos: usize,
6079}
6080
6081impl<'a> Cursor<'a> {
6082 const fn new(buf: &'a [u8]) -> Self {
6083 Self { buf, pos: 0 }
6084 }
6085
6086 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6087 let end = self
6088 .pos
6089 .checked_add(n)
6090 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6091 if end > self.buf.len() {
6092 return Err(StorageError::Corrupt(format!(
6093 "unexpected EOF at offset {} (wanted {n} more bytes)",
6094 self.pos
6095 )));
6096 }
6097 let s = &self.buf[self.pos..end];
6098 self.pos = end;
6099 Ok(s)
6100 }
6101
6102 fn read_u8(&mut self) -> Result<u8, StorageError> {
6103 Ok(self.take(1)?[0])
6104 }
6105 fn read_u16(&mut self) -> Result<u16, StorageError> {
6106 let s = self.take(2)?;
6107 Ok(u16::from_le_bytes([s[0], s[1]]))
6108 }
6109 fn read_u32(&mut self) -> Result<u32, StorageError> {
6110 let s = self.take(4)?;
6111 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6112 }
6113 fn read_i32(&mut self) -> Result<i32, StorageError> {
6114 let s = self.take(4)?;
6115 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6116 }
6117 fn read_u64(&mut self) -> Result<u64, StorageError> {
6120 let s = self.take(8)?;
6121 Ok(u64::from_le_bytes([
6122 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6123 ]))
6124 }
6125 fn read_i64(&mut self) -> Result<i64, StorageError> {
6126 let s = self.take(8)?;
6127 let arr: [u8; 8] = s.try_into().expect("checked");
6128 Ok(i64::from_le_bytes(arr))
6129 }
6130 fn read_f64(&mut self) -> Result<f64, StorageError> {
6131 let s = self.take(8)?;
6132 let arr: [u8; 8] = s.try_into().expect("checked");
6133 Ok(f64::from_le_bytes(arr))
6134 }
6135 fn read_f32(&mut self) -> Result<f32, StorageError> {
6136 let s = self.take(4)?;
6137 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6138 }
6139 fn read_str(&mut self) -> Result<String, StorageError> {
6140 let len = self.read_u16()? as usize;
6141 let bytes = self.take(len)?;
6142 core::str::from_utf8(bytes)
6143 .map(String::from)
6144 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6145 }
6146
6147 fn read_str_long(&mut self) -> Result<String, StorageError> {
6151 let len = self.read_u32()? as usize;
6152 let bytes = self.take(len)?;
6153 core::str::from_utf8(bytes)
6154 .map(String::from)
6155 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6156 }
6157
6158 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6162 let tag = self.read_u8()?;
6163 match tag {
6164 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6165 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6166 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6167 other => Err(StorageError::Corrupt(format!(
6168 "unknown index key tag: {other}"
6169 ))),
6170 }
6171 }
6172 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6178 match ty {
6179 DataType::SmallInt => {
6180 let s = self.take(2)?;
6181 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6182 }
6183 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6184 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6185 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6186 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6187 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6188 Ok(Value::Text(self.read_str()?))
6189 }
6190 DataType::Vector {
6191 encoding: VecEncoding::F32,
6192 ..
6193 } => {
6194 let dim = self.read_u32()? as usize;
6195 let mut v = Vec::with_capacity(dim);
6196 for _ in 0..dim {
6197 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6198 v.push(f32::from_le_bytes(bytes));
6199 }
6200 Ok(Value::Vector(v))
6201 }
6202 DataType::Vector {
6203 encoding: VecEncoding::Sq8,
6204 ..
6205 } => {
6206 let dim = self.read_u32()? as usize;
6207 let min = self.read_f32()?;
6208 let max = self.read_f32()?;
6209 let bytes = self.take(dim)?.to_vec();
6210 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6211 }
6212 DataType::Vector {
6213 encoding: VecEncoding::F16,
6214 ..
6215 } => {
6216 let dim = self.read_u32()? as usize;
6217 let bytes = self.take(dim * 2)?.to_vec();
6218 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6219 }
6220 DataType::Numeric { .. } => {
6221 let s = self.take(16)?;
6222 let arr: [u8; 16] = s.try_into().expect("checked");
6223 let scaled = i128::from_le_bytes(arr);
6224 let scale = self.read_u8()?;
6225 Ok(Value::Numeric { scaled, scale })
6226 }
6227 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6228 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6229 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6230 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6231 DataType::Interval => {
6232 Err(StorageError::Corrupt(
6237 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6238 ))
6239 }
6240 DataType::Json => Ok(Value::Json(self.read_str()?)),
6241 DataType::Bytes => {
6244 let len = self.read_u16()? as usize;
6245 let bytes = self.take(len)?.to_vec();
6246 Ok(Value::Bytes(bytes))
6247 }
6248 DataType::TextArray => {
6250 let count = self.read_u16()? as usize;
6251 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6252 for _ in 0..count {
6253 match self.read_u8()? {
6254 0 => items.push(Some(self.read_str()?)),
6255 1 => items.push(None),
6256 other => {
6257 return Err(StorageError::Corrupt(format!(
6258 "TEXT[] null flag: unknown byte {other}"
6259 )));
6260 }
6261 }
6262 }
6263 Ok(Value::TextArray(items))
6264 }
6265 DataType::IntArray => {
6267 let count = self.read_u16()? as usize;
6268 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6269 for _ in 0..count {
6270 match self.read_u8()? {
6271 0 => items.push(Some(self.read_i32()?)),
6272 1 => items.push(None),
6273 other => {
6274 return Err(StorageError::Corrupt(format!(
6275 "INT[] null flag: unknown byte {other}"
6276 )));
6277 }
6278 }
6279 }
6280 Ok(Value::IntArray(items))
6281 }
6282 DataType::BigIntArray => {
6284 let count = self.read_u16()? as usize;
6285 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6286 for _ in 0..count {
6287 match self.read_u8()? {
6288 0 => items.push(Some(self.read_i64()?)),
6289 1 => items.push(None),
6290 other => {
6291 return Err(StorageError::Corrupt(format!(
6292 "BIGINT[] null flag: unknown byte {other}"
6293 )));
6294 }
6295 }
6296 }
6297 Ok(Value::BigIntArray(items))
6298 }
6299 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6303 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6304 }
6305 }
6306
6307 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6309 let count = self.read_u16()? as usize;
6310 let mut out = Vec::with_capacity(count);
6311 for _ in 0..count {
6312 let word = self.read_str()?;
6313 let pos_count = self.read_u16()? as usize;
6314 let mut positions = Vec::with_capacity(pos_count);
6315 for _ in 0..pos_count {
6316 positions.push(self.read_u16()?);
6317 }
6318 let weight = self.read_u8()?;
6319 out.push(TsLexeme {
6320 word,
6321 positions,
6322 weight,
6323 });
6324 }
6325 Ok(out)
6326 }
6327
6328 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6330 let tag = self.read_u8()?;
6331 match tag {
6332 0 => {
6333 let word = self.read_str()?;
6334 let weight_mask = self.read_u8()?;
6335 Ok(TsQueryAst::Term { word, weight_mask })
6336 }
6337 1 => {
6338 let a = self.read_tsquery_body()?;
6339 let b = self.read_tsquery_body()?;
6340 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6341 }
6342 2 => {
6343 let a = self.read_tsquery_body()?;
6344 let b = self.read_tsquery_body()?;
6345 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6346 }
6347 3 => {
6348 let x = self.read_tsquery_body()?;
6349 Ok(TsQueryAst::Not(Box::new(x)))
6350 }
6351 4 => {
6352 let distance = self.read_u16()?;
6353 let left = self.read_tsquery_body()?;
6354 let right = self.read_tsquery_body()?;
6355 Ok(TsQueryAst::Phrase {
6356 left: Box::new(left),
6357 right: Box::new(right),
6358 distance,
6359 })
6360 }
6361 other => Err(StorageError::Corrupt(format!(
6362 "tsquery: unknown node tag {other}"
6363 ))),
6364 }
6365 }
6366
6367 fn read_value(&mut self) -> Result<Value, StorageError> {
6368 let tag = self.read_u8()?;
6369 match tag {
6370 0 => Ok(Value::Null),
6371 1 => Ok(Value::Int(self.read_i32()?)),
6372 2 => Ok(Value::BigInt(self.read_i64()?)),
6373 3 => Ok(Value::Float(self.read_f64()?)),
6374 4 => Ok(Value::Text(self.read_str()?)),
6375 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6376 6 => {
6377 let dim = self.read_u32()? as usize;
6378 let mut v = Vec::with_capacity(dim);
6379 for _ in 0..dim {
6380 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6381 v.push(f32::from_le_bytes(bytes));
6382 }
6383 Ok(Value::Vector(v))
6384 }
6385 7 => {
6386 let s = self.take(2)?;
6387 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6388 }
6389 8 => {
6390 let s = self.take(16)?;
6391 let arr: [u8; 16] = s.try_into().expect("checked");
6392 let scaled = i128::from_le_bytes(arr);
6393 let scale = self.read_u8()?;
6394 Ok(Value::Numeric { scaled, scale })
6395 }
6396 9 => Ok(Value::Date(self.read_i32()?)),
6397 10 => Ok(Value::Timestamp(self.read_i64()?)),
6398 11 => {
6403 let dim = self.read_u32()? as usize;
6404 let min = self.read_f32()?;
6405 let max = self.read_f32()?;
6406 let bytes = self.take(dim)?.to_vec();
6407 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6408 }
6409 12 => {
6412 let dim = self.read_u32()? as usize;
6413 let bytes = self.take(dim * 2)?.to_vec();
6414 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6415 }
6416 14 => {
6418 let len = self.read_u16()? as usize;
6419 let bytes = self.take(len)?.to_vec();
6420 Ok(Value::Bytes(bytes))
6421 }
6422 15 => {
6425 let count = self.read_u16()? as usize;
6426 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6427 for _ in 0..count {
6428 match self.read_u8()? {
6429 0 => items.push(Some(self.read_str()?)),
6430 1 => items.push(None),
6431 other => {
6432 return Err(StorageError::Corrupt(format!(
6433 "TEXT[] null flag in value tag: unknown byte {other}"
6434 )));
6435 }
6436 }
6437 }
6438 Ok(Value::TextArray(items))
6439 }
6440 16 => {
6442 let count = self.read_u16()? as usize;
6443 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6444 for _ in 0..count {
6445 match self.read_u8()? {
6446 0 => items.push(Some(self.read_i32()?)),
6447 1 => items.push(None),
6448 other => {
6449 return Err(StorageError::Corrupt(format!(
6450 "INT[] null flag in value tag: unknown byte {other}"
6451 )));
6452 }
6453 }
6454 }
6455 Ok(Value::IntArray(items))
6456 }
6457 17 => {
6458 let count = self.read_u16()? as usize;
6459 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6460 for _ in 0..count {
6461 match self.read_u8()? {
6462 0 => items.push(Some(self.read_i64()?)),
6463 1 => items.push(None),
6464 other => {
6465 return Err(StorageError::Corrupt(format!(
6466 "BIGINT[] null flag in value tag: unknown byte {other}"
6467 )));
6468 }
6469 }
6470 }
6471 Ok(Value::BigIntArray(items))
6472 }
6473 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6476 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6478 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6479 }
6480 }
6481
6482 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6486 let m_max_0 = self.read_u16()? as usize;
6487 let entry_raw = self.read_u32()?;
6488 let entry = if entry_raw == u32::MAX {
6489 None
6490 } else {
6491 Some(entry_raw as usize)
6492 };
6493 let entry_level = self.read_u8()?;
6494 let node_count = self.read_u32()? as usize;
6495 let mut levels: PersistentVec<u8> = PersistentVec::new();
6500 for _ in 0..node_count {
6501 levels.push_mut(self.read_u8()?);
6502 }
6503 let layer_count = self.read_u8()? as usize;
6504 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6505 for _ in 0..layer_count {
6506 let n = self.read_u32()? as usize;
6507 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6508 for _ in 0..n {
6509 let cnt = self.read_u16()? as usize;
6510 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6511 for _ in 0..cnt {
6512 row.push(self.read_u32()?);
6513 }
6514 per_layer.push_mut(row);
6515 }
6516 layers.push(per_layer);
6517 }
6518 Ok(NswGraph {
6519 m,
6520 m_max_0,
6521 entry,
6522 entry_level,
6523 levels,
6524 layers,
6525 })
6526 }
6527}
6528
6529#[cfg(test)]
6530mod tests {
6531 use super::*;
6532 use alloc::string::ToString;
6533 use alloc::vec;
6534
6535 #[cfg(target_arch = "aarch64")]
6536 #[test]
6537 fn neon_l2_matches_scalar() {
6538 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6543 for &d in &dims {
6544 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6545 let mut a = Vec::with_capacity(d);
6546 let mut b = Vec::with_capacity(d);
6547 for _ in 0..d {
6548 state = state
6549 .wrapping_mul(6_364_136_223_846_793_005)
6550 .wrapping_add(1);
6551 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6552 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6553 state = state
6554 .wrapping_mul(6_364_136_223_846_793_005)
6555 .wrapping_add(1);
6556 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6557 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6558 a.push(x);
6559 b.push(y);
6560 }
6561 let scalar = l2_distance_sq_scalar(&a, &b);
6562 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6563 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6564 assert!(
6565 (scalar - neon).abs() <= tol,
6566 "dim={d}: scalar={scalar} neon={neon} diff={}",
6567 (scalar - neon).abs()
6568 );
6569 }
6570 }
6571
6572 #[cfg(target_arch = "aarch64")]
6573 #[test]
6574 fn neon_inner_product_matches_scalar() {
6575 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6579 for &d in &dims {
6580 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6581 let mut a = Vec::with_capacity(d);
6582 let mut b = Vec::with_capacity(d);
6583 for _ in 0..d {
6584 state = state
6585 .wrapping_mul(6_364_136_223_846_793_005)
6586 .wrapping_add(1);
6587 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6588 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6589 state = state
6590 .wrapping_mul(6_364_136_223_846_793_005)
6591 .wrapping_add(1);
6592 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6593 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6594 a.push(x);
6595 b.push(y);
6596 }
6597 let scalar = inner_product_scalar(&a, &b);
6598 let neon = unsafe { inner_product_neon(&a, &b) };
6599 #[allow(clippy::cast_precision_loss)]
6600 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6601 assert!(
6602 (scalar - neon).abs() <= tol,
6603 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6604 (scalar - neon).abs()
6605 );
6606 }
6607 }
6608
6609 #[cfg(target_arch = "aarch64")]
6610 #[allow(clippy::similar_names)]
6611 #[test]
6612 fn neon_cosine_dot_norms_matches_scalar() {
6613 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6614 for &d in &dims {
6615 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
6616 let mut a = Vec::with_capacity(d);
6617 let mut b = Vec::with_capacity(d);
6618 for _ in 0..d {
6619 state = state
6620 .wrapping_mul(6_364_136_223_846_793_005)
6621 .wrapping_add(1);
6622 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6623 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6624 state = state
6625 .wrapping_mul(6_364_136_223_846_793_005)
6626 .wrapping_add(1);
6627 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6628 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6629 a.push(x);
6630 b.push(y);
6631 }
6632 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
6633 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
6634 #[allow(clippy::cast_precision_loss)]
6635 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6636 #[allow(clippy::cast_precision_loss)]
6637 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6638 assert!(
6639 (dot_s - dot_n).abs() <= tol_d,
6640 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
6641 );
6642 assert!(
6643 (na_s - na_n).abs() <= tol_n,
6644 "cosine na dim={d}: scalar={na_s} neon={na_n}"
6645 );
6646 assert!(
6647 (nb_s - nb_n).abs() <= tol_n,
6648 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
6649 );
6650 }
6651 }
6652
6653 fn make_users_schema() -> TableSchema {
6654 TableSchema::new(
6655 "users",
6656 vec![
6657 ColumnSchema::new("id", DataType::Int, false),
6658 ColumnSchema::new("name", DataType::Text, false),
6659 ColumnSchema::new("score", DataType::Float, true),
6660 ],
6661 )
6662 }
6663
6664 #[test]
6665 fn value_type_tag_matches_variant() {
6666 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
6667 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
6668 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
6669 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
6670 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
6671 assert_eq!(Value::Null.data_type(), None);
6672 assert!(Value::Null.is_null());
6673 assert!(!Value::Int(0).is_null());
6674 }
6675
6676 #[test]
6677 fn sq8_value_reports_sq8_data_type() {
6678 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
6683 let v = Value::Sq8Vector(q);
6684 assert_eq!(
6685 v.data_type(),
6686 Some(DataType::Vector {
6687 dim: 5,
6688 encoding: VecEncoding::Sq8,
6689 }),
6690 );
6691 }
6692
6693 #[test]
6694 fn datatype_display_matches_pg_keyword() {
6695 assert_eq!(DataType::Int.to_string(), "INT");
6696 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
6697 assert_eq!(DataType::Float.to_string(), "FLOAT");
6698 assert_eq!(DataType::Text.to_string(), "TEXT");
6699 assert_eq!(DataType::Bool.to_string(), "BOOL");
6700 }
6701
6702 #[test]
6703 fn row_len_and_emptiness() {
6704 let r = Row::new(vec![Value::Int(1), Value::Null]);
6705 assert_eq!(r.len(), 2);
6706 assert!(!r.is_empty());
6707 assert!(Row::new(Vec::new()).is_empty());
6708 }
6709
6710 #[test]
6711 fn table_schema_column_position() {
6712 let s = make_users_schema();
6713 assert_eq!(s.column_position("id"), Some(0));
6714 assert_eq!(s.column_position("score"), Some(2));
6715 assert_eq!(s.column_position("missing"), None);
6716 }
6717
6718 #[test]
6719 fn catalog_create_table_then_lookup() {
6720 let mut cat = Catalog::new();
6721 cat.create_table(make_users_schema()).unwrap();
6722 assert_eq!(cat.table_count(), 1);
6723 assert!(cat.get("users").is_some());
6724 assert!(cat.get("nope").is_none());
6725 }
6726
6727 #[test]
6728 fn catalog_duplicate_table_is_rejected() {
6729 let mut cat = Catalog::new();
6730 cat.create_table(make_users_schema()).unwrap();
6731 let err = cat.create_table(make_users_schema()).unwrap_err();
6732 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
6733 }
6734
6735 #[test]
6736 fn table_insert_happy_path_appends_row() {
6737 let mut cat = Catalog::new();
6738 cat.create_table(make_users_schema()).unwrap();
6739 let t = cat.get_mut("users").unwrap();
6740 t.insert(Row::new(vec![
6741 Value::Int(1),
6742 Value::Text("alice".into()),
6743 Value::Float(99.5),
6744 ]))
6745 .unwrap();
6746 assert_eq!(t.row_count(), 1);
6747 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
6748 }
6749
6750 #[test]
6751 fn table_insert_arity_mismatch() {
6752 let mut cat = Catalog::new();
6753 cat.create_table(make_users_schema()).unwrap();
6754 let t = cat.get_mut("users").unwrap();
6755 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
6756 assert!(matches!(
6757 err,
6758 StorageError::ArityMismatch {
6759 expected: 3,
6760 actual: 1
6761 }
6762 ));
6763 assert_eq!(t.row_count(), 0);
6764 }
6765
6766 #[test]
6767 fn table_insert_type_mismatch_reports_column() {
6768 let mut cat = Catalog::new();
6769 cat.create_table(make_users_schema()).unwrap();
6770 let t = cat.get_mut("users").unwrap();
6771 let err = t
6772 .insert(Row::new(vec![
6773 Value::Int(1),
6774 Value::Int(42), Value::Float(0.0),
6776 ]))
6777 .unwrap_err();
6778 match err {
6779 StorageError::TypeMismatch {
6780 ref column,
6781 expected,
6782 actual,
6783 position,
6784 } => {
6785 assert_eq!(column, "name");
6786 assert_eq!(expected, DataType::Text);
6787 assert_eq!(actual, DataType::Int);
6788 assert_eq!(position, 1);
6789 }
6790 other => panic!("unexpected: {other:?}"),
6791 }
6792 assert_eq!(t.row_count(), 0);
6793 }
6794
6795 #[test]
6796 fn table_insert_null_into_not_null_rejected() {
6797 let mut cat = Catalog::new();
6798 cat.create_table(make_users_schema()).unwrap();
6799 let t = cat.get_mut("users").unwrap();
6800 let err = t
6801 .insert(Row::new(vec![
6802 Value::Int(1),
6803 Value::Null, Value::Float(1.0),
6805 ]))
6806 .unwrap_err();
6807 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
6808 }
6809
6810 #[test]
6811 fn table_insert_null_into_nullable_ok() {
6812 let mut cat = Catalog::new();
6813 cat.create_table(make_users_schema()).unwrap();
6814 let t = cat.get_mut("users").unwrap();
6815 t.insert(Row::new(vec![
6816 Value::Int(1),
6817 Value::Text("bob".into()),
6818 Value::Null,
6819 ]))
6820 .unwrap();
6821 assert_eq!(t.row_count(), 1);
6822 }
6823
6824 #[test]
6825 fn catalog_get_mut_independent_per_table() {
6826 let mut cat = Catalog::new();
6827 cat.create_table(TableSchema::new(
6828 "a",
6829 vec![ColumnSchema::new("v", DataType::Int, false)],
6830 ))
6831 .unwrap();
6832 cat.create_table(TableSchema::new(
6833 "b",
6834 vec![ColumnSchema::new("v", DataType::Int, false)],
6835 ))
6836 .unwrap();
6837 cat.get_mut("a")
6838 .unwrap()
6839 .insert(Row::new(vec![Value::Int(1)]))
6840 .unwrap();
6841 assert_eq!(cat.get("a").unwrap().row_count(), 1);
6842 assert_eq!(cat.get("b").unwrap().row_count(), 0);
6843 }
6844
6845 fn assert_round_trip(cat: &Catalog) {
6848 let bytes = cat.serialize();
6849 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6850 assert_eq!(restored.table_count(), cat.table_count());
6853 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
6854 assert_eq!(a.schema, b.schema);
6855 assert_eq!(a.rows, b.rows);
6856 }
6857 }
6858
6859 #[test]
6860 fn serialize_empty_catalog_round_trips() {
6861 assert_round_trip(&Catalog::new());
6862 }
6863
6864 #[test]
6865 fn serialize_single_empty_table_round_trips() {
6866 let mut cat = Catalog::new();
6867 cat.create_table(make_users_schema()).unwrap();
6868 assert_round_trip(&cat);
6869 }
6870
6871 #[test]
6872 fn nsw_clone_is_o1() {
6873 let mut cat = Catalog::new();
6882 cat.create_table(TableSchema::new(
6883 "docs",
6884 alloc::vec![
6885 ColumnSchema::new("id", DataType::Int, false),
6886 ColumnSchema::new(
6887 "v",
6888 DataType::Vector {
6889 dim: 3,
6890 encoding: VecEncoding::F32
6891 },
6892 true
6893 ),
6894 ],
6895 ))
6896 .unwrap();
6897 let t = cat.get_mut("docs").unwrap();
6898 for i in 0..1500_i32 {
6899 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
6901 t.insert(Row::new(alloc::vec![
6902 Value::Int(i),
6903 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6904 ]))
6905 .unwrap();
6906 }
6907 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6908 .unwrap();
6909 let g = match &cat.get("docs").unwrap().indices()[0].kind {
6910 IndexKind::Nsw(g) => g,
6911 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
6912 panic!("expected NSW")
6913 }
6914 };
6915 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
6918 assert!(
6919 g.layers.len() >= 2,
6920 "1500 nodes should populate at least two HNSW layers, got {}",
6921 g.layers.len()
6922 );
6923
6924 let cloned = g.clone();
6925
6926 assert!(
6927 g.levels.shares_storage_with(&cloned.levels),
6928 "levels PV not shared after clone — clone copied elements (O(N))"
6929 );
6930 assert_eq!(g.layers.len(), cloned.layers.len());
6931 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
6932 assert!(
6933 orig.shares_storage_with(cl),
6934 "layer {l} PV not shared after clone — clone copied elements (O(N))"
6935 );
6936 }
6937 }
6938
6939 #[test]
6940 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
6941 let mut cat = Catalog::new();
6948 cat.create_table(TableSchema::new(
6949 "vecs",
6950 alloc::vec![
6951 ColumnSchema::new("id", DataType::Int, false),
6952 ColumnSchema::new(
6953 "v",
6954 DataType::Vector {
6955 dim: 8,
6956 encoding: VecEncoding::Sq8,
6957 },
6958 false,
6959 ),
6960 ],
6961 ))
6962 .unwrap();
6963 let t = cat.get_mut("vecs").unwrap();
6964 for i in 0..32_i32 {
6965 #[allow(clippy::cast_precision_loss)]
6966 let base = (i as f32) * 0.03;
6967 let v: Vec<f32> = (0..8_i32)
6968 .map(|j| {
6969 #[allow(clippy::cast_precision_loss)]
6970 let off = (j as f32) * 0.01;
6971 base + off
6972 })
6973 .collect();
6974 t.insert(Row::new(alloc::vec![
6975 Value::Int(i),
6976 Value::Sq8Vector(quantize::quantize(&v)),
6977 ]))
6978 .unwrap();
6979 }
6980 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6981 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6984 let (before_cell, before_ty, before_hits) = {
6985 let t_ref = cat.get("vecs").unwrap();
6986 (
6987 t_ref.rows()[5].values[1].clone(),
6988 t_ref.schema().columns[1].ty,
6989 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6990 )
6991 };
6992
6993 let bytes = cat.serialize();
6994 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6995 let rt = restored.get("vecs").unwrap();
6996 assert_eq!(rt.schema().columns[1].ty, before_ty);
6997 assert_eq!(rt.rows()[5].values[1], before_cell);
6998 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6999 assert_eq!(before_hits, after_hits);
7000 }
7001
7002 #[test]
7003 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7004 use crate::halfvec;
7011 let mut cat = Catalog::new();
7012 cat.create_table(TableSchema::new(
7013 "vecs",
7014 alloc::vec![
7015 ColumnSchema::new("id", DataType::Int, false),
7016 ColumnSchema::new(
7017 "v",
7018 DataType::Vector {
7019 dim: 8,
7020 encoding: VecEncoding::F16,
7021 },
7022 false,
7023 ),
7024 ],
7025 ))
7026 .unwrap();
7027 let t = cat.get_mut("vecs").unwrap();
7028 for i in 0..32_i32 {
7029 #[allow(clippy::cast_precision_loss)]
7030 let base = (i as f32) * 0.03;
7031 let v: Vec<f32> = (0..8_i32)
7032 .map(|j| {
7033 #[allow(clippy::cast_precision_loss)]
7034 let off = (j as f32) * 0.01;
7035 base + off
7036 })
7037 .collect();
7038 t.insert(Row::new(alloc::vec![
7039 Value::Int(i),
7040 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7041 ]))
7042 .unwrap();
7043 }
7044 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7045 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7046 let (before_cell, before_ty, before_hits) = {
7047 let t_ref = cat.get("vecs").unwrap();
7048 (
7049 t_ref.rows()[5].values[1].clone(),
7050 t_ref.schema().columns[1].ty,
7051 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7052 )
7053 };
7054 let bytes = cat.serialize();
7055 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7056 let rt = restored.get("vecs").unwrap();
7057 assert_eq!(rt.schema().columns[1].ty, before_ty);
7058 assert_eq!(rt.rows()[5].values[1], before_cell);
7059 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7060 assert_eq!(before_hits, after_hits);
7061 }
7062
7063 #[test]
7064 #[allow(clippy::similar_names)]
7065 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7066 use crate::halfvec;
7073 fn next(state: &mut u64) -> f32 {
7074 *state = state
7075 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7076 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7077 #[allow(clippy::cast_precision_loss)]
7078 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7079 2.0 * u - 1.0
7080 }
7081 let dim: u32 = 32;
7082 let n: usize = 512;
7083 let dim_us = dim as usize;
7084 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7085 let corpus: Vec<Vec<f32>> = (0..n)
7086 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7087 .collect();
7088 let queries: Vec<Vec<f32>> = (0..32)
7089 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7090 .collect();
7091 let exact_top10: Vec<Vec<usize>> = queries
7092 .iter()
7093 .map(|q| {
7094 let mut scored: Vec<(f32, usize)> = corpus
7095 .iter()
7096 .enumerate()
7097 .map(|(i, v)| (l2_distance_sq(v, q), i))
7098 .collect();
7099 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7100 scored.into_iter().take(10).map(|(_, i)| i).collect()
7101 })
7102 .collect();
7103 let mut cat = Catalog::new();
7104 cat.create_table(TableSchema::new(
7105 "vecs",
7106 alloc::vec![
7107 ColumnSchema::new("id", DataType::Int, false),
7108 ColumnSchema::new(
7109 "v",
7110 DataType::Vector {
7111 dim,
7112 encoding: VecEncoding::F16,
7113 },
7114 false,
7115 ),
7116 ],
7117 ))
7118 .unwrap();
7119 let t = cat.get_mut("vecs").unwrap();
7120 for (i, v) in corpus.iter().enumerate() {
7121 t.insert(Row::new(alloc::vec![
7122 Value::Int(i32::try_from(i).unwrap()),
7123 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7124 ]))
7125 .unwrap();
7126 }
7127 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7128 let table = cat.get("vecs").unwrap();
7129 let mut total_overlap = 0_usize;
7130 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7131 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7132 for h in &hits {
7133 if exact.contains(h) {
7134 total_overlap += 1;
7135 }
7136 }
7137 }
7138 #[allow(clippy::cast_precision_loss)]
7139 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7140 assert!(
7141 recall >= 0.95,
7142 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7143 check halfvec dispatch in `cell_to_query_metric_distance`"
7144 );
7145 }
7146
7147 #[test]
7148 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7149 use crate::quantize;
7156 fn next(state: &mut u64) -> f32 {
7160 *state = state
7161 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7162 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7163 #[allow(clippy::cast_precision_loss)]
7164 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7165 2.0 * u - 1.0
7166 }
7167 let dim: u32 = 32;
7168 let n: usize = 512;
7169 let dim_us = dim as usize;
7170 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7171 let corpus: Vec<Vec<f32>> = (0..n)
7172 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7173 .collect();
7174 let queries: Vec<Vec<f32>> = (0..32)
7175 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7176 .collect();
7177 let exact_top10: Vec<Vec<usize>> = queries
7179 .iter()
7180 .map(|q| {
7181 let mut scored: Vec<(f32, usize)> = corpus
7182 .iter()
7183 .enumerate()
7184 .map(|(i, v)| (l2_distance_sq(v, q), i))
7185 .collect();
7186 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7187 scored.into_iter().take(10).map(|(_, i)| i).collect()
7188 })
7189 .collect();
7190 let mut cat = Catalog::new();
7193 cat.create_table(TableSchema::new(
7194 "vecs",
7195 alloc::vec![
7196 ColumnSchema::new("id", DataType::Int, false),
7197 ColumnSchema::new(
7198 "v",
7199 DataType::Vector {
7200 dim,
7201 encoding: VecEncoding::Sq8,
7202 },
7203 false,
7204 ),
7205 ],
7206 ))
7207 .unwrap();
7208 let t = cat.get_mut("vecs").unwrap();
7209 for (i, v) in corpus.iter().enumerate() {
7210 t.insert(Row::new(alloc::vec![
7211 Value::Int(i32::try_from(i).unwrap()),
7212 Value::Sq8Vector(quantize::quantize(v)),
7213 ]))
7214 .unwrap();
7215 }
7216 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7217 let table = cat.get("vecs").unwrap();
7218 let mut total_overlap = 0_usize;
7219 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7220 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7221 for h in &hits {
7222 if exact.contains(h) {
7223 total_overlap += 1;
7224 }
7225 }
7226 }
7227 #[allow(clippy::cast_precision_loss)]
7228 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7229 assert!(
7230 recall >= 0.95,
7231 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7232 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7233 );
7234 }
7235
7236 #[test]
7237 fn nsw_index_topology_persists_through_round_trip() {
7238 let mut cat = Catalog::new();
7244 cat.create_table(TableSchema::new(
7245 "docs",
7246 alloc::vec![
7247 ColumnSchema::new("id", DataType::Int, false),
7248 ColumnSchema::new(
7249 "v",
7250 DataType::Vector {
7251 dim: 3,
7252 encoding: VecEncoding::F32
7253 },
7254 true
7255 ),
7256 ],
7257 ))
7258 .unwrap();
7259 let t = cat.get_mut("docs").unwrap();
7260 for i in 0..6_i32 {
7261 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7263 let row = Row::new(alloc::vec![
7264 Value::Int(i),
7265 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7266 ]);
7267 t.insert(row).unwrap();
7268 }
7269 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7270 .unwrap();
7271 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7272 IndexKind::Nsw(g) => g.clone(),
7273 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7274 panic!("expected NSW")
7275 }
7276 };
7277 let bytes = cat.serialize();
7278 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7279 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7280 IndexKind::Nsw(g) => g.clone(),
7281 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7282 panic!("expected NSW")
7283 }
7284 };
7285 assert_eq!(restored_graph.m, original.m);
7286 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7287 assert_eq!(restored_graph.entry, original.entry);
7288 assert_eq!(restored_graph.entry_level, original.entry_level);
7289 assert_eq!(restored_graph.levels, original.levels);
7290 assert_eq!(restored_graph.layers, original.layers);
7291 }
7292
7293 #[test]
7294 fn hnsw_level_assignment_is_deterministic() {
7295 for i in 0..32usize {
7298 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7299 }
7300 }
7301
7302 #[test]
7303 fn hnsw_layer_0_dominates_population() {
7304 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7309 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7310 }
7311
7312 #[test]
7313 fn hnsw_search_matches_brute_force_for_l2_top1() {
7314 let mut cat = Catalog::new();
7318 cat.create_table(TableSchema::new(
7319 "vecs",
7320 alloc::vec![
7321 ColumnSchema::new("id", DataType::Int, false),
7322 ColumnSchema::new(
7323 "v",
7324 DataType::Vector {
7325 dim: 3,
7326 encoding: VecEncoding::F32
7327 },
7328 true
7329 ),
7330 ],
7331 ))
7332 .unwrap();
7333 let t = cat.get_mut("vecs").unwrap();
7334 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7335 (1, [0.0, 0.0, 0.0]),
7336 (2, [1.0, 0.0, 0.0]),
7337 (3, [0.0, 1.0, 0.0]),
7338 (4, [0.0, 0.0, 1.0]),
7339 (5, [1.0, 1.0, 0.0]),
7340 (6, [1.0, 0.0, 1.0]),
7341 (7, [0.0, 1.0, 1.0]),
7342 (8, [1.0, 1.0, 1.0]),
7343 (9, [0.5, 0.5, 0.5]),
7344 (10, [0.2, 0.8, 0.5]),
7345 ];
7346 for &(id, v) in &dataset {
7347 t.insert(Row::new(alloc::vec![
7348 Value::Int(id),
7349 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7350 ]))
7351 .unwrap();
7352 }
7353 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7354 let idx_pos = cat
7355 .get("vecs")
7356 .unwrap()
7357 .indices()
7358 .iter()
7359 .position(|i| i.name == "v_idx")
7360 .unwrap();
7361 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7362 let table = cat.get("vecs").unwrap();
7363 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7364 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7365 .map(|i| {
7366 let Value::Vector(v) = &table.rows[i].values[1] else {
7367 return (f32::INFINITY, i);
7368 };
7369 (l2_distance_sq(v, &query), i)
7370 })
7371 .collect();
7372 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7373 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7374 assert_eq!(
7375 hnsw_top[0].1, brute[0].1,
7376 "HNSW top-1 != brute-force top-1 for {query:?}"
7377 );
7378 }
7379 }
7380
7381 #[test]
7382 fn serialize_table_with_rows_round_trips() {
7383 let mut cat = Catalog::new();
7384 cat.create_table(make_users_schema()).unwrap();
7385 let t = cat.get_mut("users").unwrap();
7386 t.insert(Row::new(vec![
7387 Value::Int(1),
7388 Value::Text("alice".into()),
7389 Value::Float(95.5),
7390 ]))
7391 .unwrap();
7392 t.insert(Row::new(vec![
7393 Value::Int(2),
7394 Value::Text("bob".into()),
7395 Value::Null,
7396 ]))
7397 .unwrap();
7398 assert_round_trip(&cat);
7399 }
7400
7401 #[test]
7402 fn serialize_multiple_tables_round_trips() {
7403 let mut cat = Catalog::new();
7404 cat.create_table(make_users_schema()).unwrap();
7405 cat.create_table(TableSchema::new(
7406 "flags",
7407 vec![
7408 ColumnSchema::new("id", DataType::BigInt, false),
7409 ColumnSchema::new("active", DataType::Bool, false),
7410 ],
7411 ))
7412 .unwrap();
7413 cat.get_mut("flags")
7414 .unwrap()
7415 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7416 .unwrap();
7417 assert_round_trip(&cat);
7418 }
7419
7420 #[test]
7421 fn deserialize_rejects_bad_magic() {
7422 let mut buf = b"BADMAGIC".to_vec();
7423 buf.push(FILE_VERSION);
7424 buf.extend_from_slice(&0u32.to_le_bytes());
7425 let err = Catalog::deserialize(&buf).unwrap_err();
7426 assert!(matches!(err, StorageError::Corrupt(_)));
7427 }
7428
7429 #[test]
7430 fn deserialize_rejects_unsupported_version() {
7431 let mut buf = FILE_MAGIC.to_vec();
7432 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7434 let err = Catalog::deserialize(&buf).unwrap_err();
7435 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7436 }
7437
7438 #[test]
7439 fn deserialize_rejects_truncated_file() {
7440 let mut cat = Catalog::new();
7441 cat.create_table(make_users_schema()).unwrap();
7442 let bytes = cat.serialize();
7443 let truncated = &bytes[..bytes.len() - 1];
7445 assert!(matches!(
7446 Catalog::deserialize(truncated),
7447 Err(StorageError::Corrupt(_))
7448 ));
7449 }
7450
7451 #[test]
7452 fn deserialize_rejects_trailing_garbage() {
7453 let cat = Catalog::new();
7454 let mut bytes = cat.serialize();
7455 bytes.push(0xFF);
7456 assert!(matches!(
7457 Catalog::deserialize(&bytes),
7458 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7459 ));
7460 }
7461
7462 fn populated_users() -> Catalog {
7465 let mut cat = Catalog::new();
7466 cat.create_table(make_users_schema()).unwrap();
7467 let t = cat.get_mut("users").unwrap();
7468 for (id, name, score) in [
7469 (1, "alice", Some(90.0)),
7470 (2, "bob", None),
7471 (3, "alice", Some(70.0)), ] {
7473 t.insert(Row::new(vec![
7474 Value::Int(id),
7475 Value::Text(name.into()),
7476 score.map_or(Value::Null, Value::Float),
7477 ]))
7478 .unwrap();
7479 }
7480 cat
7481 }
7482
7483 #[test]
7484 fn add_index_builds_from_existing_rows() {
7485 let mut cat = populated_users();
7486 cat.get_mut("users")
7487 .unwrap()
7488 .add_index("by_id".into(), "id")
7489 .unwrap();
7490 let t = cat.get("users").unwrap();
7491 let idx = t.index_on(0).expect("index_on(0)");
7492 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7493 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7494 }
7495
7496 #[test]
7497 fn add_index_dup_name_rejected() {
7498 let mut cat = populated_users();
7499 let t = cat.get_mut("users").unwrap();
7500 t.add_index("ix".into(), "id").unwrap();
7501 let err = t.add_index("ix".into(), "name").unwrap_err();
7502 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7503 }
7504
7505 #[test]
7506 fn add_index_unknown_column_rejected() {
7507 let mut cat = populated_users();
7508 let err = cat
7509 .get_mut("users")
7510 .unwrap()
7511 .add_index("ix".into(), "ghost")
7512 .unwrap_err();
7513 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7514 }
7515
7516 #[test]
7517 fn insert_after_create_index_updates_it() {
7518 let mut cat = populated_users();
7519 let t = cat.get_mut("users").unwrap();
7520 t.add_index("by_name".into(), "name").unwrap();
7521 t.insert(Row::new(vec![
7522 Value::Int(4),
7523 Value::Text("dave".into()),
7524 Value::Null,
7525 ]))
7526 .unwrap();
7527 let idx = t.index_on(1).unwrap();
7528 assert_eq!(
7529 idx.lookup_eq(&IndexKey::Text("dave".into())),
7530 &[RowLocator::Hot(3)]
7531 );
7532 assert_eq!(
7534 idx.lookup_eq(&IndexKey::Text("alice".into())),
7535 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7536 );
7537 }
7538
7539 #[test]
7540 fn null_or_float_values_are_not_indexed() {
7541 let mut cat = populated_users();
7542 let t = cat.get_mut("users").unwrap();
7543 t.add_index("by_score".into(), "score").unwrap();
7544 let idx = t.index_on(2).unwrap();
7545 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7550 }
7551
7552 #[test]
7555 fn vector_value_data_type_carries_dim() {
7556 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7557 assert_eq!(
7558 v.data_type(),
7559 Some(DataType::Vector {
7560 dim: 3,
7561 encoding: VecEncoding::F32
7562 })
7563 );
7564 }
7565
7566 #[test]
7567 fn vector_column_insert_matching_dim_ok() {
7568 let mut cat = Catalog::new();
7569 cat.create_table(TableSchema::new(
7570 "emb",
7571 vec![ColumnSchema::new(
7572 "v",
7573 DataType::Vector {
7574 dim: 3,
7575 encoding: VecEncoding::F32,
7576 },
7577 false,
7578 )],
7579 ))
7580 .unwrap();
7581 cat.get_mut("emb")
7582 .unwrap()
7583 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7584 .unwrap();
7585 }
7586
7587 #[test]
7588 fn vector_column_insert_dim_mismatch_rejected() {
7589 let mut cat = Catalog::new();
7590 cat.create_table(TableSchema::new(
7591 "emb",
7592 vec![ColumnSchema::new(
7593 "v",
7594 DataType::Vector {
7595 dim: 3,
7596 encoding: VecEncoding::F32,
7597 },
7598 false,
7599 )],
7600 ))
7601 .unwrap();
7602 let err = cat
7603 .get_mut("emb")
7604 .unwrap()
7605 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7606 .unwrap_err();
7607 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7608 }
7609
7610 #[test]
7611 fn vector_value_survives_catalog_round_trip() {
7612 let mut cat = Catalog::new();
7613 cat.create_table(TableSchema::new(
7614 "emb",
7615 vec![
7616 ColumnSchema::new("id", DataType::Int, false),
7617 ColumnSchema::new(
7618 "v",
7619 DataType::Vector {
7620 dim: 4,
7621 encoding: VecEncoding::F32,
7622 },
7623 false,
7624 ),
7625 ],
7626 ))
7627 .unwrap();
7628 cat.get_mut("emb")
7629 .unwrap()
7630 .insert(Row::new(vec![
7631 Value::Int(1),
7632 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
7633 ]))
7634 .unwrap();
7635 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
7636 let table = restored.get("emb").unwrap();
7637 assert_eq!(
7638 table.schema().columns[1].ty,
7639 DataType::Vector {
7640 dim: 4,
7641 encoding: VecEncoding::F32
7642 }
7643 );
7644 assert_eq!(
7645 table.rows()[0].values[1],
7646 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
7647 );
7648 }
7649
7650 #[test]
7651 fn index_survives_serialize_deserialize_round_trip() {
7652 let mut cat = populated_users();
7653 cat.get_mut("users")
7654 .unwrap()
7655 .add_index("by_name".into(), "name")
7656 .unwrap();
7657 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7658 let idx = restored
7659 .get("users")
7660 .unwrap()
7661 .index_on(1)
7662 .expect("index_on(1) after restore");
7663 assert_eq!(idx.name, "by_name");
7664 assert_eq!(
7666 idx.lookup_eq(&IndexKey::Text("alice".into())),
7667 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7668 );
7669 }
7670
7671 fn bigint_pk_users_schema() -> TableSchema {
7676 TableSchema::new(
7677 "users",
7678 vec![
7679 ColumnSchema::new("id", DataType::BigInt, false),
7680 ColumnSchema::new("name", DataType::Text, false),
7681 ],
7682 )
7683 }
7684
7685 fn make_user_row(id: i64, name: &str) -> Row {
7686 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
7687 }
7688
7689 #[test]
7690 fn lookup_by_pk_finds_row_via_hot_index() {
7691 let mut cat = Catalog::new();
7692 cat.create_table(bigint_pk_users_schema()).unwrap();
7693 let t = cat.get_mut("users").unwrap();
7694 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7695 t.insert(make_user_row(id, name)).unwrap();
7696 }
7697 t.add_index("by_id".into(), "id").unwrap();
7698 let got = cat
7700 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7701 .unwrap();
7702 assert_eq!(got, make_user_row(2, "bob"));
7703 assert_eq!(cat.cold_segment_count(), 0);
7704 }
7705
7706 #[test]
7707 fn lookup_by_pk_returns_none_when_key_missing() {
7708 let mut cat = Catalog::new();
7709 cat.create_table(bigint_pk_users_schema()).unwrap();
7710 let t = cat.get_mut("users").unwrap();
7711 t.insert(make_user_row(1, "alice")).unwrap();
7712 t.add_index("by_id".into(), "id").unwrap();
7713 assert!(
7714 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7715 .is_none()
7716 );
7717 assert!(
7719 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
7720 .is_none()
7721 );
7722 assert!(
7723 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
7724 .is_none()
7725 );
7726 }
7727
7728 #[test]
7729 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
7730 let mut cat = Catalog::new();
7734 cat.create_table(bigint_pk_users_schema()).unwrap();
7735 let t = cat.get_mut("users").unwrap();
7736 t.add_index("by_id".into(), "id").unwrap();
7737 let schema = t.schema.clone();
7738
7739 let cold_rows: Vec<(i64, &str)> =
7740 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
7741 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7742 .iter()
7743 .map(|(id, name)| {
7744 let row = make_user_row(*id, name);
7745 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7746 })
7747 .collect();
7748 let (seg_bytes, _meta) =
7749 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7750 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7751 assert_eq!(seg_id, 0);
7752 assert_eq!(cat.cold_segment_count(), 1);
7753
7754 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7755 .iter()
7756 .map(|(id, _)| {
7757 (
7758 IndexKey::Int(*id),
7759 RowLocator::Cold {
7760 segment_id: seg_id,
7761 page_offset: 0,
7762 },
7763 )
7764 })
7765 .collect();
7766 let registered = cat
7767 .get_mut("users")
7768 .unwrap()
7769 .register_cold_locators("by_id", pairs)
7770 .unwrap();
7771 assert_eq!(registered, 4);
7772
7773 for (id, name) in &cold_rows {
7774 let got = cat
7775 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7776 .unwrap_or_else(|| panic!("cold key {id} not found"));
7777 assert_eq!(got, make_user_row(*id, name));
7778 }
7779 assert!(
7781 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7782 .is_none()
7783 );
7784 }
7785
7786 #[test]
7787 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
7788 let mut cat = Catalog::new();
7792 cat.create_table(bigint_pk_users_schema()).unwrap();
7793 let t = cat.get_mut("users").unwrap();
7794 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7795 t.insert(make_user_row(id, name)).unwrap();
7796 }
7797 t.add_index("by_id".into(), "id").unwrap();
7798 let schema = t.schema.clone();
7799
7800 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
7801 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7802 .iter()
7803 .map(|(id, name)| {
7804 let row = make_user_row(*id, name);
7805 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7806 })
7807 .collect();
7808 let (seg_bytes, _) =
7809 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7810 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7811 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7812 .iter()
7813 .map(|(id, _)| {
7814 (
7815 IndexKey::Int(*id),
7816 RowLocator::Cold {
7817 segment_id: seg_id,
7818 page_offset: 0,
7819 },
7820 )
7821 })
7822 .collect();
7823 cat.get_mut("users")
7824 .unwrap()
7825 .register_cold_locators("by_id", pairs)
7826 .unwrap();
7827
7828 assert_eq!(
7830 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7831 .unwrap(),
7832 make_user_row(1, "alice")
7833 );
7834 assert_eq!(
7835 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7836 .unwrap(),
7837 make_user_row(2, "bob")
7838 );
7839 assert_eq!(
7841 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
7842 .unwrap(),
7843 make_user_row(100, "ivy")
7844 );
7845 assert_eq!(
7846 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
7847 .unwrap(),
7848 make_user_row(200, "joe")
7849 );
7850 assert!(
7852 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
7853 .is_none()
7854 );
7855 }
7856
7857 #[test]
7858 fn register_cold_locators_rejects_nsw_index() {
7859 let mut cat = Catalog::new();
7860 cat.create_table(TableSchema::new(
7861 "vecs",
7862 vec![
7863 ColumnSchema::new("id", DataType::Int, false),
7864 ColumnSchema::new(
7865 "v",
7866 DataType::Vector {
7867 dim: 4,
7868 encoding: VecEncoding::F32,
7869 },
7870 false,
7871 ),
7872 ],
7873 ))
7874 .unwrap();
7875 let t = cat.get_mut("vecs").unwrap();
7876 t.insert(Row::new(vec![
7877 Value::Int(1),
7878 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
7879 ]))
7880 .unwrap();
7881 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
7882 let err = t
7883 .register_cold_locators(
7884 "by_v",
7885 vec![(
7886 IndexKey::Int(1),
7887 RowLocator::Cold {
7888 segment_id: 0,
7889 page_offset: 0,
7890 },
7891 )],
7892 )
7893 .unwrap_err();
7894 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
7897 }
7898
7899 #[test]
7900 fn load_segment_bytes_rejects_garbage() {
7901 let mut cat = Catalog::new();
7902 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
7903 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
7904 assert_eq!(cat.cold_segment_count(), 0);
7906 }
7907
7908 #[test]
7909 fn load_segment_bytes_returns_sequential_ids() {
7910 let mut cat = Catalog::new();
7911 cat.create_table(bigint_pk_users_schema()).unwrap();
7912 let schema = cat.get("users").unwrap().schema.clone();
7913 for batch in 0u32..3 {
7914 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
7915 .map(|i| {
7916 let id = u64::from(batch) * 100 + i;
7917 let row = make_user_row(id.cast_signed(), "x");
7918 (id, encode_row_body_dense(&row, &schema))
7919 })
7920 .collect();
7921 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7922 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
7923 }
7924 assert_eq!(cat.cold_segment_count(), 3);
7925 }
7926
7927 #[test]
7934 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
7935 let mut cat = populated_users();
7942 cat.get_mut("users")
7943 .unwrap()
7944 .add_index("by_name".into(), "name")
7945 .unwrap();
7946
7947 let v8_bytes = encode_as_v8(&cat);
7952 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
7953
7954 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
7955 let idx = restored
7956 .get("users")
7957 .unwrap()
7958 .index_on(1)
7959 .expect("index_on(1) after restore");
7960 assert_eq!(
7963 idx.lookup_eq(&IndexKey::Text("alice".into())),
7964 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7965 );
7966 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
7968 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
7969 }
7970 }
7971
7972 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
7977 let mut out = Vec::with_capacity(64);
7978 out.extend_from_slice(FILE_MAGIC);
7979 out.push(8u8);
7980 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
7981 for t in &cat.tables {
7982 write_str(&mut out, &t.schema.name);
7983 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
7984 for c in &t.schema.columns {
7985 write_str(&mut out, &c.name);
7986 write_data_type(&mut out, c.ty);
7987 out.push(u8::from(c.nullable));
7988 match &c.default {
7989 None => out.push(0),
7990 Some(v) => {
7991 out.push(1);
7992 write_value(&mut out, v);
7993 }
7994 }
7995 out.push(u8::from(c.auto_increment));
7996 }
7997 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
7998 for row in &t.rows {
7999 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8000 }
8001 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8002 for idx in &t.indices {
8003 write_str(&mut out, &idx.name);
8004 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8005 match &idx.kind {
8006 IndexKind::BTree(_) => out.push(0),
8009 IndexKind::Nsw(g) => {
8010 out.push(1);
8011 write_u16(&mut out, u16::try_from(g.m).unwrap());
8012 write_nsw_graph(&mut out, g);
8013 }
8014 IndexKind::Brin { .. } => panic!(
8017 "v8 catalog writer cannot serialise BRIN — \
8018 tests with BRIN indices must use the current writer"
8019 ),
8020 IndexKind::Gin(_) => panic!(
8021 "v8 catalog writer cannot serialise GIN — \
8022 tests with GIN indices must use the current writer"
8023 ),
8024 }
8025 }
8026 }
8027 out
8028 }
8029
8030 #[test]
8036 fn v9_catalog_round_trip_preserves_cold_locators() {
8037 let mut cat = Catalog::new();
8038 cat.create_table(bigint_pk_users_schema()).unwrap();
8039 let t = cat.get_mut("users").unwrap();
8040 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8042 t.insert(make_user_row(id, name)).unwrap();
8043 }
8044 t.add_index("by_id".into(), "id").unwrap();
8045 let schema = t.schema.clone();
8046
8047 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8049 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8050 .iter()
8051 .map(|(id, name)| {
8052 let row = make_user_row(*id, name);
8053 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8054 })
8055 .collect();
8056 let (seg_bytes, _) =
8057 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8058 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8059 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8060 .iter()
8061 .map(|(id, _)| {
8062 (
8063 IndexKey::Int(*id),
8064 RowLocator::Cold {
8065 segment_id: seg_id,
8066 page_offset: 0,
8067 },
8068 )
8069 })
8070 .collect();
8071 cat.get_mut("users")
8072 .unwrap()
8073 .register_cold_locators("by_id", pairs)
8074 .unwrap();
8075
8076 let bytes = cat.serialize();
8078 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8079 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8080
8081 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8088 assert_eq!(restored_seg_id, seg_id);
8089
8090 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8091 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8093 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8094 for (id, _) in &cold_rows {
8096 assert_eq!(
8097 idx.lookup_eq(&IndexKey::Int(*id)),
8098 &[RowLocator::Cold {
8099 segment_id: seg_id,
8100 page_offset: 0,
8101 }]
8102 );
8103 }
8104 assert_eq!(
8106 restored
8107 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8108 .unwrap(),
8109 make_user_row(2, "bob")
8110 );
8111 for (id, name) in &cold_rows {
8112 assert_eq!(
8113 restored
8114 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8115 .unwrap(),
8116 make_user_row(*id, name)
8117 );
8118 }
8119 }
8120
8121 #[test]
8128 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8129 let schema = TableSchema::new(
8130 "wide",
8131 vec![
8132 ColumnSchema::new("a", DataType::SmallInt, true),
8133 ColumnSchema::new("b", DataType::Int, false),
8134 ColumnSchema::new("c", DataType::BigInt, false),
8135 ColumnSchema::new("d", DataType::Float, false),
8136 ColumnSchema::new("e", DataType::Bool, false),
8137 ColumnSchema::new("f", DataType::Text, false),
8138 ColumnSchema::new(
8139 "g",
8140 DataType::Vector {
8141 dim: 3,
8142 encoding: VecEncoding::F32,
8143 },
8144 false,
8145 ),
8146 ColumnSchema::new(
8147 "h",
8148 DataType::Numeric {
8149 precision: 18,
8150 scale: 2,
8151 },
8152 false,
8153 ),
8154 ColumnSchema::new("i", DataType::Date, false),
8155 ColumnSchema::new("j", DataType::Timestamp, false),
8156 ],
8157 );
8158 let cases: &[Row] = &[
8159 Row::new(vec![
8160 Value::SmallInt(7),
8161 Value::Int(42),
8162 Value::BigInt(1_000_000),
8163 Value::Float(1.5),
8164 Value::Bool(true),
8165 Value::Text("hello".into()),
8166 Value::Vector(vec![1.0, 2.0, 3.0]),
8167 Value::Numeric {
8168 scaled: 12345,
8169 scale: 2,
8170 },
8171 Value::Date(20_000),
8172 Value::Timestamp(1_700_000_000_000_000),
8173 ]),
8174 Row::new(vec![
8176 Value::Null,
8177 Value::Int(0),
8178 Value::BigInt(0),
8179 Value::Float(0.0),
8180 Value::Bool(false),
8181 Value::Text(String::new()),
8182 Value::Vector(vec![]),
8183 Value::Numeric {
8184 scaled: 0,
8185 scale: 2,
8186 },
8187 Value::Date(0),
8188 Value::Timestamp(0),
8189 ]),
8190 Row::new(vec![
8191 Value::SmallInt(-1),
8192 Value::Int(-1),
8193 Value::BigInt(-1),
8194 Value::Float(-0.5),
8195 Value::Bool(true),
8196 Value::Text("a much longer payload here".into()),
8197 Value::Vector(vec![0.1, 0.2, 0.3]),
8198 Value::Numeric {
8199 scaled: -999_999_999,
8200 scale: 2,
8201 },
8202 Value::Date(-1),
8203 Value::Timestamp(-1),
8204 ]),
8205 ];
8206 for row in cases {
8207 let actual = encode_row_body_dense(row, &schema).len();
8208 let fast = row_body_encoded_len(row, &schema);
8209 assert_eq!(actual, fast, "row {row:?}");
8210 }
8211 }
8212
8213 #[test]
8214 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8215 let mut cat = Catalog::new();
8216 cat.create_table(bigint_pk_users_schema()).unwrap();
8217 let t = cat.get_mut("users").unwrap();
8218 assert_eq!(t.hot_bytes(), 0);
8219 let mut expected: u64 = 0;
8220 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8221 let row = make_user_row(id, name);
8222 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8223 t.insert(row).unwrap();
8224 }
8225 assert_eq!(t.hot_bytes(), expected);
8226 assert_eq!(cat.hot_tier_bytes(), expected);
8227 }
8228
8229 #[test]
8230 fn hot_bytes_shrinks_on_delete() {
8231 let mut cat = Catalog::new();
8232 cat.create_table(bigint_pk_users_schema()).unwrap();
8233 let t = cat.get_mut("users").unwrap();
8234 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8235 t.insert(make_user_row(id, name)).unwrap();
8236 }
8237 let before = t.hot_bytes();
8238 let bob_row = make_user_row(2, "bob");
8240 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8241 let removed = t.delete_rows(&[1]);
8242 assert_eq!(removed, 1);
8243 assert_eq!(t.hot_bytes(), before - bob_bytes);
8244 }
8245
8246 #[test]
8247 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8248 let mut cat = Catalog::new();
8249 cat.create_table(bigint_pk_users_schema()).unwrap();
8250 let t = cat.get_mut("users").unwrap();
8251 t.insert(make_user_row(1, "alice")).unwrap();
8252 let after_insert = t.hot_bytes();
8253 let new_row = make_user_row(1, "alice-the-longer-name");
8256 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8257 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8258 t.update_row(0, new_row.values).unwrap();
8259 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8260 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8261 }
8262
8263 #[test]
8264 fn hot_bytes_round_trips_through_serialize_deserialize() {
8265 let mut cat = Catalog::new();
8266 cat.create_table(bigint_pk_users_schema()).unwrap();
8267 let t = cat.get_mut("users").unwrap();
8268 for i in 0..10 {
8269 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8270 .unwrap();
8271 }
8272 let pre = cat.hot_tier_bytes();
8273 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8274 assert_eq!(restored.hot_tier_bytes(), pre);
8275 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8276 }
8277
8278 #[test]
8285 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8286 let mut cat = Catalog::new();
8287 cat.create_table(bigint_pk_users_schema()).unwrap();
8288 let t = cat.get_mut("users").unwrap();
8289 for id in 0..10i64 {
8290 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8291 .unwrap();
8292 }
8293 t.add_index("by_id".into(), "id").unwrap();
8294 let total_bytes_before = t.hot_bytes();
8295
8296 let report = cat
8297 .freeze_oldest_to_cold("users", "by_id", 6)
8298 .expect("freeze succeeds");
8299 assert_eq!(report.frozen_rows, 6);
8300 assert_eq!(report.segment_id, 0);
8301 assert!(report.bytes_freed > 0);
8302 assert!(!report.segment_bytes.is_empty());
8303
8304 let t = cat.get("users").unwrap();
8305 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8306 assert_eq!(cat.cold_segment_count(), 1);
8307 assert_eq!(
8309 t.hot_bytes(),
8310 total_bytes_before - report.bytes_freed,
8311 "hot_bytes accounting matches FreezeReport"
8312 );
8313
8314 for id in 0..10i64 {
8317 let got = cat
8318 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8319 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8320 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8321 }
8322 }
8323
8324 #[test]
8329 fn freeze_twice_preserves_prior_cold_locators() {
8330 let mut cat = Catalog::new();
8331 cat.create_table(bigint_pk_users_schema()).unwrap();
8332 let t = cat.get_mut("users").unwrap();
8333 for id in 0..12i64 {
8334 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8335 .unwrap();
8336 }
8337 t.add_index("by_id".into(), "id").unwrap();
8338
8339 cat.freeze_oldest_to_cold("users", "by_id", 4)
8340 .expect("first freeze ok");
8341 cat.freeze_oldest_to_cold("users", "by_id", 4)
8342 .expect("second freeze ok");
8343
8344 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8345 assert_eq!(cat.cold_segment_count(), 2);
8346 for id in 0..12i64 {
8349 let got = cat
8350 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8351 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8352 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8353 }
8354 }
8355
8356 #[test]
8359 fn freeze_oldest_to_cold_rejects_invalid_input() {
8360 let mut cat = Catalog::new();
8361 cat.create_table(bigint_pk_users_schema()).unwrap();
8362 let t = cat.get_mut("users").unwrap();
8363 for id in 0..3i64 {
8364 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8365 .unwrap();
8366 }
8367 t.add_index("by_id".into(), "id").unwrap();
8368
8369 assert!(matches!(
8371 cat.freeze_oldest_to_cold("users", "by_id", 0),
8372 Err(StorageError::Corrupt(_))
8373 ));
8374 assert!(matches!(
8376 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8377 Err(StorageError::Corrupt(_))
8378 ));
8379 assert!(matches!(
8381 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8382 Err(StorageError::Corrupt(_))
8383 ));
8384 assert!(matches!(
8386 cat.freeze_oldest_to_cold("users", "by_id", 999),
8387 Err(StorageError::Corrupt(_))
8388 ));
8389 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8391 assert_eq!(cat.cold_segment_count(), 0);
8392 }
8393
8394 #[test]
8397 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8398 let mut cat = Catalog::new();
8399 cat.create_table(TableSchema::new(
8400 "by_name",
8401 vec![
8402 ColumnSchema::new("name", DataType::Text, false),
8403 ColumnSchema::new("payload", DataType::BigInt, false),
8404 ],
8405 ))
8406 .unwrap();
8407 let t = cat.get_mut("by_name").unwrap();
8408 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8409 .unwrap();
8410 t.add_index("by_n".into(), "name").unwrap();
8411 let err = cat
8412 .freeze_oldest_to_cold("by_name", "by_n", 1)
8413 .expect_err("non-integer PK rejected");
8414 match err {
8415 StorageError::Corrupt(s) => assert!(
8416 s.contains("non-integer"),
8417 "error message names the constraint: {s}"
8418 ),
8419 other => panic!("expected Corrupt, got {other:?}"),
8420 }
8421 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8423 assert_eq!(cat.cold_segment_count(), 0);
8424 }
8425
8426 #[test]
8431 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8432 let mut cat = Catalog::new();
8433 cat.create_table(bigint_pk_users_schema()).unwrap();
8434 let t = cat.get_mut("users").unwrap();
8435 for id in 0..6i64 {
8436 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8437 .unwrap();
8438 }
8439 t.add_index("by_id".into(), "id").unwrap();
8440 t.add_index("by_name".into(), "name").unwrap();
8441
8442 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8443
8444 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8448 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8449 assert_eq!(got.len(), 1);
8450 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8451 match got[0] {
8452 RowLocator::Hot(i) => {
8453 assert_eq!(i, 1);
8456 }
8457 RowLocator::Cold { .. } => unreachable!(),
8458 }
8459 }
8460
8461 #[test]
8469 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8470 let mut cat = Catalog::new();
8471 cat.create_table(bigint_pk_users_schema()).unwrap();
8472 let t = cat.get_mut("users").unwrap();
8473 for id in 0..6i64 {
8474 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8475 .unwrap();
8476 }
8477 t.add_index("by_id".into(), "id").unwrap();
8478 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8481 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8482
8483 let new_idx = cat
8485 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8486 .expect("promote ok")
8487 .expect("PK 2 was cold");
8488 assert_eq!(
8489 new_idx, 2,
8490 "promoted row appended after the 2 surviving hot rows"
8491 );
8492
8493 let t = cat.get("users").unwrap();
8494 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8495 let row = make_user_row(2, "u-2");
8497 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8498 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8499
8500 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8503 assert_eq!(entries.len(), 1, "exactly one locator per key");
8504 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8505 assert_eq!(
8507 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8508 .unwrap(),
8509 row
8510 );
8511 assert_eq!(
8514 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8515 .unwrap(),
8516 make_user_row(0, "u-0")
8517 );
8518 }
8519
8520 #[test]
8524 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8525 let mut cat = Catalog::new();
8526 cat.create_table(bigint_pk_users_schema()).unwrap();
8527 let t = cat.get_mut("users").unwrap();
8528 t.insert(make_user_row(7, "alice")).unwrap();
8529 t.add_index("by_id".into(), "id").unwrap();
8530
8531 assert!(
8533 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8534 .unwrap()
8535 .is_none()
8536 );
8537 assert!(
8539 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8540 .unwrap()
8541 .is_none()
8542 );
8543 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8545 assert_eq!(cat.cold_segment_count(), 0);
8546 }
8547
8548 #[test]
8553 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8554 let mut cat = Catalog::new();
8555 cat.create_table(bigint_pk_users_schema()).unwrap();
8556 let t = cat.get_mut("users").unwrap();
8557 for id in 0..5i64 {
8558 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8559 .unwrap();
8560 }
8561 t.add_index("by_id".into(), "id").unwrap();
8562 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8563
8564 assert!(
8566 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8567 .is_some(),
8568 "frozen PK resolves before shadow"
8569 );
8570 let removed = cat
8571 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8572 .unwrap();
8573 assert_eq!(removed, 1, "exactly one cold locator retired");
8574
8575 assert!(
8578 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8579 .is_none(),
8580 "shadowed key no longer resolves"
8581 );
8582 assert_eq!(
8584 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8585 .unwrap(),
8586 make_user_row(0, "u-0")
8587 );
8588 assert_eq!(
8589 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8590 .unwrap(),
8591 make_user_row(2, "u-2")
8592 );
8593 }
8594
8595 #[test]
8600 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8601 let mut cat = Catalog::new();
8602 cat.create_table(bigint_pk_users_schema()).unwrap();
8603 let t = cat.get_mut("users").unwrap();
8604 t.insert(make_user_row(1, "alice")).unwrap();
8605 t.add_index("by_id".into(), "id").unwrap();
8606 assert_eq!(
8607 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8608 .unwrap(),
8609 0,
8610 "hot-only key drops no cold locators"
8611 );
8612 assert_eq!(
8613 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
8614 .unwrap(),
8615 0,
8616 "absent key drops no cold locators"
8617 );
8618 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8619 }
8620
8621 #[test]
8623 fn promote_and_shadow_reject_invalid_inputs() {
8624 let mut cat = Catalog::new();
8625 cat.create_table(bigint_pk_users_schema()).unwrap();
8626 let t = cat.get_mut("users").unwrap();
8627 t.insert(make_user_row(1, "alice")).unwrap();
8628 t.add_index("by_id".into(), "id").unwrap();
8629
8630 assert!(matches!(
8632 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
8633 Err(StorageError::Corrupt(_))
8634 ));
8635 assert!(matches!(
8636 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
8637 Err(StorageError::Corrupt(_))
8638 ));
8639 assert!(matches!(
8641 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8642 Err(StorageError::Corrupt(_))
8643 ));
8644 assert!(matches!(
8645 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8646 Err(StorageError::Corrupt(_))
8647 ));
8648 }
8649
8650 #[test]
8657 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
8658 let mut a = Catalog::new();
8659 let mut b = Catalog::new();
8660 for cat in [&mut a, &mut b] {
8661 cat.create_table(bigint_pk_users_schema()).unwrap();
8662 let t = cat.get_mut("users").unwrap();
8663 for id in 0..10i64 {
8664 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8665 .unwrap();
8666 }
8667 t.add_index("by_id".into(), "id").unwrap();
8668 }
8669 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
8670 let slice = b
8671 .prepare_freeze_slice("users", "by_id", 0..6)
8672 .expect("prepare");
8673 let parallel = b
8674 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
8675 .expect("commit");
8676 assert_eq!(single.segment_id, parallel.segment_id);
8677 assert_eq!(single.frozen_rows, parallel.frozen_rows);
8678 assert_eq!(single.bytes_freed, parallel.bytes_freed);
8679 assert_eq!(single.segment_bytes, parallel.segment_bytes);
8680 for id in 0..10i64 {
8682 assert_eq!(
8683 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8684 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8685 "PK {id} differs after single vs slice freeze"
8686 );
8687 }
8688 }
8689
8690 #[test]
8695 fn commit_freeze_slices_two_slices_match_single_slice() {
8696 let mut a = Catalog::new();
8697 let mut b = Catalog::new();
8698 for cat in [&mut a, &mut b] {
8699 cat.create_table(bigint_pk_users_schema()).unwrap();
8700 let t = cat.get_mut("users").unwrap();
8701 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
8704 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
8705 .unwrap();
8706 }
8707 t.add_index("by_id".into(), "id").unwrap();
8708 }
8709 let single = a
8710 .prepare_freeze_slice("users", "by_id", 0..8)
8711 .expect("prepare");
8712 let one = a
8713 .commit_freeze_slices("users", "by_id", alloc::vec![single])
8714 .expect("commit one");
8715 let s1 = b
8716 .prepare_freeze_slice("users", "by_id", 0..4)
8717 .expect("prepare s1");
8718 let s2 = b
8719 .prepare_freeze_slice("users", "by_id", 4..8)
8720 .expect("prepare s2");
8721 let two = b
8722 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
8723 .expect("commit two");
8724 assert_eq!(one.segment_bytes, two.segment_bytes);
8725 assert_eq!(one.frozen_rows, two.frozen_rows);
8726 for id in 0..10i64 {
8729 assert_eq!(
8730 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8731 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8732 "PK {id} differs after one-slice vs two-slice freeze"
8733 );
8734 }
8735 }
8736
8737 #[test]
8739 fn commit_freeze_slices_rejects_gap() {
8740 let mut cat = Catalog::new();
8741 cat.create_table(bigint_pk_users_schema()).unwrap();
8742 let t = cat.get_mut("users").unwrap();
8743 for id in 0..6i64 {
8744 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8745 .unwrap();
8746 }
8747 t.add_index("by_id".into(), "id").unwrap();
8748 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
8749 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
8750 assert!(matches!(
8751 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
8752 Err(StorageError::Corrupt(_))
8753 ));
8754 assert_eq!(cat.cold_segment_count(), 0);
8756 assert_eq!(cat.get("users").unwrap().row_count(), 6);
8757 }
8758
8759 #[test]
8761 fn commit_freeze_slices_empty_is_noop() {
8762 let mut cat = Catalog::new();
8763 cat.create_table(bigint_pk_users_schema()).unwrap();
8764 let t = cat.get_mut("users").unwrap();
8765 for id in 0..3i64 {
8766 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8767 .unwrap();
8768 }
8769 t.add_index("by_id".into(), "id").unwrap();
8770 let report = cat
8771 .commit_freeze_slices("users", "by_id", Vec::new())
8772 .unwrap();
8773 assert_eq!(report.frozen_rows, 0);
8774 assert_eq!(cat.cold_segment_count(), 0);
8775 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8776 }
8777
8778 #[test]
8785 fn compact_merges_small_segments_storage_unit() {
8786 let mut cat = Catalog::new();
8787 cat.create_table(bigint_pk_users_schema()).unwrap();
8788 let t = cat.get_mut("users").unwrap();
8789 for id in 0..8i64 {
8790 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8791 .unwrap();
8792 }
8793 t.add_index("by_id".into(), "id").unwrap();
8794 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8796 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8797 assert_eq!(cat.cold_segment_count(), 2);
8798 assert_eq!(cat.cold_segment_slot_count(), 2);
8799
8800 let max_seg_bytes = cat
8803 .cold_segment_ids_global()
8804 .iter()
8805 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8806 .max()
8807 .unwrap();
8808 let target = max_seg_bytes + 1;
8809
8810 let report = cat
8811 .compact_cold_segments("users", "by_id", target)
8812 .expect("compact succeeds");
8813 assert_eq!(report.sources.len(), 2);
8814 let merged_id = report.merged_segment_id.expect("merge happened");
8815 assert_eq!(report.merged_rows, 6);
8816 assert_eq!(report.deleted_rows_pruned, 0);
8817 assert!(!report.merged_segment_bytes.is_empty());
8818
8819 assert_eq!(cat.cold_segment_count(), 1);
8822 assert_eq!(cat.cold_segment_slot_count(), 3);
8823 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
8824
8825 for id in 0..8i64 {
8828 let got = cat
8829 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8830 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
8831 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8832 }
8833 }
8834
8835 #[test]
8839 fn compact_drops_shadowed_cold_rows() {
8840 let mut cat = Catalog::new();
8841 cat.create_table(bigint_pk_users_schema()).unwrap();
8842 let t = cat.get_mut("users").unwrap();
8843 for id in 0..6i64 {
8844 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8845 .unwrap();
8846 }
8847 t.add_index("by_id".into(), "id").unwrap();
8848 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8849 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8850 assert_eq!(
8852 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8853 .unwrap(),
8854 1
8855 );
8856 assert_eq!(
8857 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
8858 .unwrap(),
8859 1
8860 );
8861
8862 let max_seg_bytes = cat
8863 .cold_segment_ids_global()
8864 .iter()
8865 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8866 .max()
8867 .unwrap();
8868 let report = cat
8869 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8870 .expect("compact succeeds");
8871 assert_eq!(report.sources.len(), 2);
8872 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
8873 assert_eq!(report.deleted_rows_pruned, 2);
8874
8875 for shadowed in [1i64, 4i64] {
8877 assert!(
8878 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
8879 .is_none(),
8880 "shadowed PK {shadowed} must remain invisible after compact"
8881 );
8882 }
8883 for live in [0i64, 2, 3, 5] {
8885 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
8886 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
8887 }
8888 }
8889
8890 #[test]
8893 fn compact_is_noop_below_two_candidates() {
8894 let mut cat = Catalog::new();
8895 cat.create_table(bigint_pk_users_schema()).unwrap();
8896 let t = cat.get_mut("users").unwrap();
8897 for id in 0..6i64 {
8898 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8899 .unwrap();
8900 }
8901 t.add_index("by_id".into(), "id").unwrap();
8902 let report = cat
8904 .compact_cold_segments("users", "by_id", 1 << 30)
8905 .expect("noop ok");
8906 assert!(report.merged_segment_id.is_none());
8907 assert!(report.sources.is_empty());
8908
8909 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8911 let report = cat
8912 .compact_cold_segments("users", "by_id", 1 << 30)
8913 .expect("noop ok");
8914 assert!(report.merged_segment_id.is_none());
8915 assert_eq!(cat.cold_segment_count(), 1);
8916
8917 let report = cat
8920 .compact_cold_segments("users", "by_id", 1)
8921 .expect("noop ok");
8922 assert!(report.merged_segment_id.is_none());
8923 assert_eq!(cat.cold_segment_count(), 1);
8924 }
8925
8926 #[test]
8934 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
8935 let mut cat = Catalog::new();
8936 cat.create_table(bigint_pk_users_schema()).unwrap();
8937 let t = cat.get_mut("users").unwrap();
8938 for id in 0..6i64 {
8939 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8940 .unwrap();
8941 }
8942 t.add_index("by_id".into(), "id").unwrap();
8943 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8944 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8945 let max_seg_bytes = cat
8946 .cold_segment_ids_global()
8947 .iter()
8948 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8949 .max()
8950 .unwrap();
8951 let report = cat
8952 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8953 .expect("compact ok");
8954 let merged_id = report.merged_segment_id.unwrap();
8955
8956 let cat_bytes = cat.serialize();
8961 let merged_bytes = report.merged_segment_bytes.clone();
8962
8963 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
8964 restored
8965 .load_segment_bytes_at(merged_id, merged_bytes)
8966 .expect("reload merged ok");
8967
8968 for id in 0..6i64 {
8970 let got = restored
8971 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8972 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
8973 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8974 }
8975 assert_eq!(restored.cold_segment_count(), 1);
8978 }
8979
8980 #[test]
8983 fn load_segment_bytes_at_pads_and_rejects_collision() {
8984 let mut cat = Catalog::new();
8985 cat.create_table(bigint_pk_users_schema()).unwrap();
8986 let t = cat.get_mut("users").unwrap();
8987 for id in 0..4i64 {
8988 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8989 .unwrap();
8990 }
8991 t.add_index("by_id".into(), "id").unwrap();
8992 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
8993 let bytes_seg0 = report.segment_bytes.clone();
8994
8995 cat.load_segment_bytes_at(5, bytes_seg0.clone())
8999 .expect("pad + load ok");
9000 assert_eq!(cat.cold_segment_slot_count(), 6);
9001 assert_eq!(cat.cold_segment_count(), 2);
9002
9003 assert!(matches!(
9005 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9006 Err(StorageError::Corrupt(_))
9007 ));
9008 assert!(matches!(
9010 cat.load_segment_bytes_at(0, bytes_seg0),
9011 Err(StorageError::Corrupt(_))
9012 ));
9013 }
9014
9015 #[test]
9019 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9020 let mut cat = Catalog::new();
9021 cat.create_table(bigint_pk_users_schema()).unwrap();
9022 let t = cat.get_mut("users").unwrap();
9023 for id in 0..4i64 {
9024 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9025 .unwrap();
9026 }
9027 t.add_index("by_id".into(), "id").unwrap();
9028
9029 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9031 let promoted = cat
9032 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9033 .unwrap();
9034 assert!(promoted.is_some());
9035 let entries_after_promote = cat
9036 .get("users")
9037 .unwrap()
9038 .index_on(0)
9039 .unwrap()
9040 .lookup_eq(&IndexKey::Int(0))
9041 .to_vec();
9042 assert_eq!(entries_after_promote.len(), 1);
9043 assert!(entries_after_promote[0].is_hot());
9044
9045 for id in [2i64, 3] {
9052 assert_eq!(
9053 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9054 .unwrap(),
9055 make_user_row(id, &alloc::format!("u-{id}"))
9056 );
9057 }
9058 }
9059}