1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::format;
34use alloc::string::String;
35use alloc::sync::Arc;
36use alloc::vec::Vec;
37use core::fmt;
38
39use self::persistent::PersistentVec;
40use self::persistent_btree::PersistentBTreeMap;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
54pub enum VecEncoding {
55 #[default]
56 F32,
57 Sq8,
58 F16,
59}
60
61impl fmt::Display for VecEncoding {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Self::F32 => f.write_str("F32"),
65 Self::Sq8 => f.write_str("SQ8"),
66 Self::F16 => f.write_str("HALF"),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DataType {
76 SmallInt,
79 Int, BigInt, Float, Text,
83 Varchar(u32),
86 Char(u32),
90 Bool,
91 Vector {
97 dim: u32,
98 encoding: VecEncoding,
99 },
100 Numeric {
106 precision: u8,
107 scale: u8,
108 },
109 Date,
112 Timestamp,
115 Timestamptz,
123 Interval,
128 Json,
133 Jsonb,
139 Bytes,
147 TextArray,
156 IntArray,
160 BigIntArray,
163 TsVector,
171 TsQuery,
175}
176
177impl fmt::Display for DataType {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::SmallInt => f.write_str("SMALLINT"),
181 Self::Int => f.write_str("INT"),
182 Self::BigInt => f.write_str("BIGINT"),
183 Self::Float => f.write_str("FLOAT"),
184 Self::Text => f.write_str("TEXT"),
185 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
186 Self::Char(n) => write!(f, "CHAR({n})"),
187 Self::Bool => f.write_str("BOOL"),
188 Self::Vector { dim, encoding } => match encoding {
189 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
190 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
191 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
192 },
193 Self::Numeric { precision, scale } => {
194 if *scale == 0 {
195 write!(f, "NUMERIC({precision})")
196 } else {
197 write!(f, "NUMERIC({precision}, {scale})")
198 }
199 }
200 Self::Date => f.write_str("DATE"),
201 Self::Timestamp => f.write_str("TIMESTAMP"),
202 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
203 Self::Interval => f.write_str("INTERVAL"),
204 Self::Json => f.write_str("JSON"),
205 Self::Jsonb => f.write_str("JSONB"),
206 Self::Bytes => f.write_str("BYTEA"),
207 Self::TextArray => f.write_str("TEXT[]"),
208 Self::IntArray => f.write_str("INT[]"),
209 Self::BigIntArray => f.write_str("BIGINT[]"),
210 Self::TsVector => f.write_str("TSVECTOR"),
211 Self::TsQuery => f.write_str("TSQUERY"),
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct TsLexeme {
223 pub word: String,
224 pub positions: Vec<u16>,
225 pub weight: u8,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TsQueryAst {
233 Term {
237 word: String,
238 weight_mask: u8,
239 },
240 And(Box<TsQueryAst>, Box<TsQueryAst>),
241 Or(Box<TsQueryAst>, Box<TsQueryAst>),
242 Not(Box<TsQueryAst>),
243 Phrase {
246 left: Box<TsQueryAst>,
247 right: Box<TsQueryAst>,
248 distance: u16,
249 },
250}
251
252#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub enum Value {
258 SmallInt(i16),
259 Int(i32),
260 BigInt(i64),
261 Float(f64),
262 Text(String),
263 Bool(bool),
264 Vector(Vec<f32>),
265 Sq8Vector(crate::quantize::Sq8Vector),
272 HalfVector(crate::halfvec::HalfVector),
278 Numeric {
282 scaled: i128,
283 scale: u8,
284 },
285 Date(i32),
287 Timestamp(i64),
289 Interval {
292 months: i32,
293 micros: i64,
294 },
295 Json(String),
299 Bytes(Vec<u8>),
305 TextArray(Vec<Option<String>>),
311 IntArray(Vec<Option<i32>>),
315 BigIntArray(Vec<Option<i64>>),
318 TsVector(Vec<TsLexeme>),
323 TsQuery(TsQueryAst),
326 Null,
327}
328
329impl Value {
330 pub fn data_type(&self) -> Option<DataType> {
332 match self {
333 Self::SmallInt(_) => Some(DataType::SmallInt),
334 Self::Int(_) => Some(DataType::Int),
335 Self::BigInt(_) => Some(DataType::BigInt),
336 Self::Float(_) => Some(DataType::Float),
337 Self::Text(_) => Some(DataType::Text),
340 Self::Bool(_) => Some(DataType::Bool),
341 Self::Vector(v) => Some(DataType::Vector {
342 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
343 encoding: VecEncoding::F32,
344 }),
345 Self::Sq8Vector(q) => Some(DataType::Vector {
346 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
347 encoding: VecEncoding::Sq8,
348 }),
349 Self::HalfVector(h) => Some(DataType::Vector {
350 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
351 encoding: VecEncoding::F16,
352 }),
353 Self::Numeric { scale, .. } => Some(DataType::Numeric {
358 precision: 0,
359 scale: *scale,
360 }),
361 Self::Date(_) => Some(DataType::Date),
362 Self::Timestamp(_) => Some(DataType::Timestamp),
363 Self::Interval { .. } => Some(DataType::Interval),
364 Self::Json(_) => Some(DataType::Json),
365 Self::Bytes(_) => Some(DataType::Bytes),
366 Self::TextArray(_) => Some(DataType::TextArray),
367 Self::IntArray(_) => Some(DataType::IntArray),
368 Self::BigIntArray(_) => Some(DataType::BigIntArray),
369 Self::TsVector(_) => Some(DataType::TsVector),
370 Self::TsQuery(_) => Some(DataType::TsQuery),
371 Self::Null => None,
372 }
373 }
374
375 pub const fn is_null(&self) -> bool {
376 matches!(self, Self::Null)
377 }
378}
379
380#[derive(Debug, Clone, PartialEq)]
383pub struct Row {
384 pub values: Vec<Value>,
385}
386
387impl Row {
388 pub const fn new(values: Vec<Value>) -> Self {
389 Self { values }
390 }
391
392 pub fn len(&self) -> usize {
393 self.values.len()
394 }
395
396 pub fn is_empty(&self) -> bool {
397 self.values.is_empty()
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
402pub struct ColumnSchema {
403 pub name: String,
404 pub ty: DataType,
405 pub nullable: bool,
406 pub default: Option<Value>,
411 pub runtime_default: Option<String>,
419 pub auto_increment: bool,
423}
424
425#[derive(Debug, Clone, PartialEq)]
426pub struct TableSchema {
427 pub name: String,
428 pub columns: Vec<ColumnSchema>,
429 pub hot_tier_bytes: Option<u64>,
435 pub foreign_keys: Vec<ForeignKeyConstraint>,
442 pub uniqueness_constraints: Vec<UniquenessConstraint>,
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
456pub struct UniquenessConstraint {
457 pub is_primary_key: bool,
462 pub columns: Vec<usize>,
466}
467
468#[derive(Debug, Clone, PartialEq, Eq)]
473pub struct ForeignKeyConstraint {
474 pub name: Option<String>,
478 pub local_columns: Vec<usize>,
481 pub parent_table: String,
483 pub parent_columns: Vec<usize>,
488 pub on_delete: FkAction,
490 pub on_update: FkAction,
493}
494
495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
497pub enum FkAction {
498 Restrict,
499 Cascade,
500 SetNull,
501 SetDefault,
502 NoAction,
503}
504
505impl FkAction {
506 pub const fn tag(self) -> u8 {
508 match self {
509 Self::Restrict => 0,
510 Self::Cascade => 1,
511 Self::SetNull => 2,
512 Self::SetDefault => 3,
513 Self::NoAction => 4,
514 }
515 }
516 pub const fn from_tag(b: u8) -> Option<Self> {
517 Some(match b {
518 0 => Self::Restrict,
519 1 => Self::Cascade,
520 2 => Self::SetNull,
521 3 => Self::SetDefault,
522 4 => Self::NoAction,
523 _ => return None,
524 })
525 }
526}
527
528impl TableSchema {
529 pub fn column_position(&self, name: &str) -> Option<usize> {
530 self.columns.iter().position(|c| c.name == name)
531 }
532}
533
534#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
539pub enum IndexKey {
540 Int(i64),
541 Text(String),
542 Bool(bool),
543}
544
545impl IndexKey {
546 pub fn from_value(v: &Value) -> Option<Self> {
547 match v {
548 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
549 Value::Int(n) => Some(Self::Int(i64::from(*n))),
550 Value::BigInt(n) => Some(Self::Int(*n)),
551 Value::Text(s) => Some(Self::Text(s.clone())),
552 Value::Bool(b) => Some(Self::Bool(*b)),
553 Value::Date(d) => Some(Self::Int(i64::from(*d))),
556 Value::Timestamp(t) => Some(Self::Int(*t)),
557 Value::Null
562 | Value::Float(_)
563 | Value::Vector(_)
564 | Value::Sq8Vector(_)
565 | Value::HalfVector(_)
566 | Value::Numeric { .. }
567 | Value::Interval { .. }
568 | Value::Json(_)
569 | Value::Bytes(_)
570 | Value::TextArray(_)
571 | Value::IntArray(_)
572 | Value::BigIntArray(_)
573 | Value::TsVector(_)
574 | Value::TsQuery(_) => None,
575 }
576 }
577}
578
579#[derive(Debug, Clone)]
584pub struct Index {
585 pub name: String,
586 pub column_position: usize,
587 pub kind: IndexKind,
588 pub included_columns: Vec<usize>,
598 pub partial_predicate: Option<String>,
605 pub expression: Option<String>,
610 pub is_unique: bool,
617 pub extra_column_positions: Vec<usize>,
626}
627
628pub const NSW_DEFAULT_M: usize = 16;
631
632#[derive(Debug, Clone)]
640pub struct FreezeReport {
641 pub segment_id: u32,
644 pub frozen_rows: usize,
647 pub bytes_freed: u64,
651 pub segment_bytes: Vec<u8>,
656}
657
658#[derive(Debug, Clone)]
667pub struct FreezeSlice {
668 pub row_range: core::ops::Range<usize>,
673 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
679}
680
681#[derive(Debug, Clone)]
697pub struct CompactReport {
698 pub sources: Vec<u32>,
700 pub merged_segment_id: Option<u32>,
702 pub merged_segment_bytes: Vec<u8>,
704 pub merged_rows: usize,
706 pub deleted_rows_pruned: usize,
711 pub bytes_reclaimed_estimate: u64,
715}
716
717#[derive(Debug, Clone)]
718pub enum IndexKind {
719 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
736 Nsw(NswGraph),
738 Brin {
745 column_type: DataType,
749 },
750 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
766}
767
768#[derive(Debug, Clone)]
777pub struct NswGraph {
778 pub m: usize,
780 pub m_max_0: usize,
783 pub entry: Option<usize>,
786 pub entry_level: u8,
788 pub levels: PersistentVec<u8>,
795 pub layers: Vec<PersistentVec<Vec<u32>>>,
811}
812
813impl NswGraph {
814 fn new(m: usize) -> Self {
815 Self {
816 m,
817 m_max_0: m.saturating_mul(2),
818 entry: None,
819 entry_level: 0,
820 levels: PersistentVec::new(),
821 layers: alloc::vec![PersistentVec::new()],
822 }
823 }
824
825 pub const fn cap_for_layer(&self, layer: u8) -> usize {
827 if layer == 0 { self.m_max_0 } else { self.m }
828 }
829}
830
831#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
838 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
841 x ^= x >> 30;
842 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
843 x ^= x >> 27;
844 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
845 x ^= x >> 31;
846 let mut level: u8 = 0;
851 while x & 0xF == 0 && level < MAX_LEVEL {
852 level += 1;
853 x >>= 4;
854 }
855 level
856}
857
858impl Index {
859 fn new_btree(name: String, column_position: usize) -> Self {
860 Self {
861 name,
862 column_position,
863 kind: IndexKind::BTree(PersistentBTreeMap::new()),
864 included_columns: Vec::new(),
865 partial_predicate: None,
866 expression: None,
867 is_unique: false,
868 extra_column_positions: Vec::new(),
869 }
870 }
871
872 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
873 Self {
874 name,
875 column_position,
876 kind: IndexKind::Nsw(NswGraph::new(m)),
877 included_columns: Vec::new(),
878 partial_predicate: None,
879 expression: None,
880 is_unique: false,
881 extra_column_positions: Vec::new(),
882 }
883 }
884
885 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
889 Self {
890 name,
891 column_position,
892 kind: IndexKind::Brin { column_type },
893 included_columns: Vec::new(),
894 partial_predicate: None,
895 expression: None,
896 is_unique: false,
897 extra_column_positions: Vec::new(),
898 }
899 }
900
901 fn new_gin(name: String, column_position: usize) -> Self {
906 Self {
907 name,
908 column_position,
909 kind: IndexKind::Gin(PersistentBTreeMap::new()),
910 included_columns: Vec::new(),
911 partial_predicate: None,
912 expression: None,
913 is_unique: false,
914 extra_column_positions: Vec::new(),
915 }
916 }
917
918 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
927 match &self.kind {
928 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
929 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => &[][..],
932 }
933 }
934
935 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
939 match &self.kind {
940 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
941 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
942 }
943 }
944
945 pub const fn nsw(&self) -> Option<&NswGraph> {
948 match &self.kind {
949 IndexKind::Nsw(g) => Some(g),
950 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
951 }
952 }
953
954 pub const fn is_brin(&self) -> bool {
959 matches!(self.kind, IndexKind::Brin { .. })
960 }
961
962 pub const fn is_gin(&self) -> bool {
966 matches!(self.kind, IndexKind::Gin(_))
967 }
968}
969
970#[derive(Debug, Clone)]
986pub struct Table {
987 schema: TableSchema,
988 rows: PersistentVec<Row>,
989 indices: Vec<Index>,
990 hot_bytes: u64,
991 cold_row_count: u64,
1005 cold_row_count_stale: bool,
1010}
1011
1012impl Table {
1013 pub fn new(schema: TableSchema) -> Self {
1014 Self {
1015 schema,
1016 rows: PersistentVec::new(),
1017 indices: Vec::new(),
1018 hot_bytes: 0,
1019 cold_row_count: 0,
1020 cold_row_count_stale: false,
1021 }
1022 }
1023
1024 #[must_use]
1028 pub const fn hot_bytes(&self) -> u64 {
1029 self.hot_bytes
1030 }
1031
1032 #[must_use]
1035 pub const fn cold_row_count(&self) -> u64 {
1036 self.cold_row_count
1037 }
1038
1039 pub fn set_cold_row_count(&mut self, n: u64) {
1042 self.cold_row_count = n;
1043 self.cold_row_count_stale = false;
1044 }
1045
1046 pub fn mark_cold_row_count_stale(&mut self) {
1051 self.cold_row_count_stale = true;
1052 }
1053
1054 #[must_use]
1058 pub const fn cold_row_count_stale(&self) -> bool {
1059 self.cold_row_count_stale
1060 }
1061
1062 #[must_use]
1073 pub fn count_cold_locators(&self) -> u64 {
1074 let mut best: u64 = 0;
1075 for idx in &self.indices {
1076 if let IndexKind::BTree(map) = &idx.kind {
1077 let n: u64 = map
1078 .iter()
1079 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1080 .sum();
1081 if n > best {
1082 best = n;
1083 }
1084 }
1085 }
1086 best
1087 }
1088
1089 pub const fn schema(&self) -> &TableSchema {
1090 &self.schema
1091 }
1092
1093 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1097 &mut self.schema
1098 }
1099
1100 pub const fn rows(&self) -> &PersistentVec<Row> {
1104 &self.rows
1105 }
1106
1107 pub const fn row_count(&self) -> usize {
1108 self.rows.len()
1109 }
1110
1111 pub fn indices_mut(&mut self) -> &mut [Index] {
1116 &mut self.indices
1117 }
1118
1119 pub fn indices(&self) -> &[Index] {
1120 &self.indices
1121 }
1122
1123 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1129 let ty = self.schema.columns.get(col_pos)?.ty;
1130 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1131 return None;
1132 }
1133 let mut max: Option<i64> = None;
1134 for row in &self.rows {
1135 match row.values.get(col_pos) {
1136 Some(Value::SmallInt(n)) => {
1137 let v = i64::from(*n);
1138 max = Some(max.map_or(v, |m| m.max(v)));
1139 }
1140 Some(Value::Int(n)) => {
1141 let v = i64::from(*n);
1142 max = Some(max.map_or(v, |m| m.max(v)));
1143 }
1144 Some(Value::BigInt(n)) => {
1145 max = Some(max.map_or(*n, |m| m.max(*n)));
1146 }
1147 _ => {}
1148 }
1149 }
1150 Some(max.map_or(1, |m| m + 1))
1151 }
1152
1153 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1157 self.indices
1164 .iter()
1165 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1166 .or_else(|| {
1167 self.indices.iter().find(|i| {
1168 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1169 })
1170 })
1171 }
1172
1173 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1177 if row.len() != self.schema.columns.len() {
1178 return Err(StorageError::ArityMismatch {
1179 expected: self.schema.columns.len(),
1180 actual: row.len(),
1181 });
1182 }
1183 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1184 if val.is_null() {
1185 if !col.nullable {
1186 return Err(StorageError::NullInNotNull {
1187 column: col.name.clone(),
1188 });
1189 }
1190 continue;
1191 }
1192 let actual = val.data_type().expect("non-null");
1193 let compatible = actual == col.ty
1207 || matches!(
1208 (actual, col.ty),
1209 (
1210 DataType::Text,
1211 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1212 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1213 | (DataType::Json, DataType::Jsonb)
1214 | (DataType::Jsonb, DataType::Json)
1215 | (DataType::Timestamp, DataType::Timestamptz)
1216 | (DataType::Timestamptz, DataType::Timestamp)
1217 )
1218 || matches!(
1219 (actual, col.ty),
1220 (
1221 DataType::Numeric { scale: a, .. },
1222 DataType::Numeric { scale: b, .. },
1223 ) if a == b
1224 );
1225 if !compatible {
1226 return Err(StorageError::TypeMismatch {
1227 column: col.name.clone(),
1228 expected: col.ty,
1229 actual,
1230 position: i,
1231 });
1232 }
1233 }
1234 let new_row_idx = self.rows.len();
1235 for idx in &mut self.indices {
1239 match &mut idx.kind {
1240 IndexKind::BTree(map) => {
1241 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1242 let mut entries = map.get(&key).cloned().unwrap_or_default();
1248 entries.push(RowLocator::Hot(new_row_idx));
1249 map.insert_mut(key, entries);
1250 }
1251 }
1252 IndexKind::Gin(map) => {
1253 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1257 for lex in lexemes {
1258 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1259 entries.push(RowLocator::Hot(new_row_idx));
1260 map.insert_mut(lex.word.clone(), entries);
1261 }
1262 }
1263 }
1264 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1268 }
1269 }
1270 self.hot_bytes = self
1273 .hot_bytes
1274 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1275 self.rows.push_mut(row);
1280 let new_row_idx = self.rows.len() - 1;
1283 let nsw_targets: Vec<usize> = self
1284 .indices
1285 .iter()
1286 .enumerate()
1287 .filter_map(|(i, idx)| {
1288 if matches!(idx.kind, IndexKind::Nsw(_)) {
1289 Some(i)
1290 } else {
1291 None
1292 }
1293 })
1294 .collect();
1295 for idx_pos in nsw_targets {
1296 nsw_insert_at(self, idx_pos, new_row_idx);
1297 }
1298 Ok(())
1299 }
1300
1301 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1305 if self.indices.iter().any(|i| i.name == name) {
1306 return Err(StorageError::DuplicateIndex { name });
1307 }
1308 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1309 StorageError::ColumnNotFound {
1310 column: column_name.into(),
1311 }
1312 })?;
1313 let mut idx = Index::new_btree(name, column_position);
1314 if let IndexKind::BTree(map) = &mut idx.kind {
1315 for (i, row) in self.rows.iter().enumerate() {
1316 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1317 let mut entries = map.get(&key).cloned().unwrap_or_default();
1318 entries.push(RowLocator::Hot(i));
1319 map.insert_mut(key, entries);
1320 }
1321 }
1322 }
1323 self.indices.push(idx);
1324 Ok(())
1325 }
1326
1327 pub fn add_nsw_index(
1332 &mut self,
1333 name: String,
1334 column_name: &str,
1335 m: usize,
1336 ) -> Result<(), StorageError> {
1337 self.add_nsw_index_inner(name, column_name, m, None)
1338 }
1339
1340 pub fn rebuild_nsw_index(
1352 &mut self,
1353 name: &str,
1354 new_encoding: Option<VecEncoding>,
1355 ) -> Result<(), StorageError> {
1356 let idx_pos = self
1357 .indices
1358 .iter()
1359 .position(|i| i.name == name)
1360 .ok_or_else(|| StorageError::IndexNotFound {
1361 name: String::from(name),
1362 })?;
1363 let col_pos = self.indices[idx_pos].column_position;
1364 let m = match &self.indices[idx_pos].kind {
1365 IndexKind::Nsw(g) => g.m,
1366 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1367 return Err(StorageError::Unsupported(format!(
1368 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1369 )));
1370 }
1371 };
1372 let col_name = self.schema.columns[col_pos].name.clone();
1373 if let Some(target) = new_encoding {
1376 let current = match self.schema.columns[col_pos].ty {
1377 DataType::Vector { encoding, .. } => encoding,
1378 ref other => {
1379 return Err(StorageError::Unsupported(format!(
1380 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1381 )));
1382 }
1383 };
1384 if target != current {
1385 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1386 unreachable!("checked above")
1387 };
1388 let n = self.rows.len();
1389 for i in 0..n {
1390 let row = self
1391 .rows
1392 .get_mut(i)
1393 .expect("row index in bounds (we iterated up to len())");
1394 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1395 let recoded = recode_vector_cell(cell, target)?;
1396 row.values[col_pos] = recoded;
1397 }
1398 self.schema.columns[col_pos].ty = DataType::Vector {
1399 dim,
1400 encoding: target,
1401 };
1402 }
1403 }
1404 self.indices.remove(idx_pos);
1406 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1407 Ok(())
1408 }
1409
1410 pub fn restore_nsw_index(
1415 &mut self,
1416 name: String,
1417 column_name: &str,
1418 graph: NswGraph,
1419 ) -> Result<(), StorageError> {
1420 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1421 }
1422
1423 pub fn restore_btree_index(
1430 &mut self,
1431 name: String,
1432 column_name: &str,
1433 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1434 ) -> Result<(), StorageError> {
1435 if self.indices.iter().any(|i| i.name == name) {
1436 return Err(StorageError::DuplicateIndex { name });
1437 }
1438 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1439 StorageError::ColumnNotFound {
1440 column: column_name.into(),
1441 }
1442 })?;
1443 self.indices.push(Index {
1444 name,
1445 column_position,
1446 kind: IndexKind::BTree(map),
1447 included_columns: Vec::new(),
1448 partial_predicate: None,
1449 expression: None,
1450 is_unique: false,
1451 extra_column_positions: Vec::new(),
1452 });
1453 Ok(())
1454 }
1455
1456 pub fn restore_brin_index(
1461 &mut self,
1462 name: String,
1463 column_name: &str,
1464 column_type: DataType,
1465 ) -> Result<(), StorageError> {
1466 if self.indices.iter().any(|i| i.name == name) {
1467 return Err(StorageError::DuplicateIndex { name });
1468 }
1469 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1470 StorageError::ColumnNotFound {
1471 column: column_name.into(),
1472 }
1473 })?;
1474 self.indices
1475 .push(Index::new_brin(name, column_position, column_type));
1476 Ok(())
1477 }
1478
1479 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1483 if self.indices.iter().any(|i| i.name == name) {
1484 return Err(StorageError::DuplicateIndex { name });
1485 }
1486 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1487 StorageError::ColumnNotFound {
1488 column: column_name.into(),
1489 }
1490 })?;
1491 let column_type = self.schema.columns[column_position].ty;
1492 self.indices
1493 .push(Index::new_brin(name, column_position, column_type));
1494 Ok(())
1495 }
1496
1497 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1502 if self.indices.iter().any(|i| i.name == name) {
1503 return Err(StorageError::DuplicateIndex { name });
1504 }
1505 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1506 StorageError::ColumnNotFound {
1507 column: column_name.into(),
1508 }
1509 })?;
1510 if self.schema.columns[column_position].ty != DataType::TsVector {
1511 return Err(StorageError::Corrupt(format!(
1512 "GIN index {name:?} requires a tsvector column; \
1513 {column_name:?} is {:?}",
1514 self.schema.columns[column_position].ty
1515 )));
1516 }
1517 let mut idx = Index::new_gin(name, column_position);
1518 if let IndexKind::Gin(map) = &mut idx.kind {
1519 for (i, row) in self.rows.iter().enumerate() {
1520 if let Value::TsVector(lexemes) = &row.values[column_position] {
1521 for lex in lexemes {
1522 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1523 entries.push(RowLocator::Hot(i));
1524 map.insert_mut(lex.word.clone(), entries);
1525 }
1526 }
1527 }
1528 }
1529 self.indices.push(idx);
1530 Ok(())
1531 }
1532
1533 pub fn restore_gin_index(
1538 &mut self,
1539 name: String,
1540 column_name: &str,
1541 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1542 ) -> Result<(), StorageError> {
1543 if self.indices.iter().any(|i| i.name == name) {
1544 return Err(StorageError::DuplicateIndex { name });
1545 }
1546 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1547 StorageError::ColumnNotFound {
1548 column: column_name.into(),
1549 }
1550 })?;
1551 let mut idx = Index::new_gin(name, column_position);
1552 idx.kind = IndexKind::Gin(map);
1553 self.indices.push(idx);
1554 Ok(())
1555 }
1556
1557 pub fn register_cold_locators<I>(
1574 &mut self,
1575 index_name: &str,
1576 locators: I,
1577 ) -> Result<usize, StorageError>
1578 where
1579 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1580 {
1581 let idx = self
1582 .indices
1583 .iter_mut()
1584 .find(|i| i.name == index_name)
1585 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1586 let map = match &mut idx.kind {
1587 IndexKind::BTree(map) => map,
1588 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1589 return Err(StorageError::Corrupt(format!(
1590 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1591 )));
1592 }
1593 };
1594 let mut count = 0usize;
1595 for (key, locator) in locators {
1596 let mut entries = map.get(&key).cloned().unwrap_or_default();
1597 entries.push(locator);
1598 map.insert_mut(key, entries);
1599 count += 1;
1600 }
1601 Ok(count)
1602 }
1603
1604 pub fn register_gin_cold_locators<I>(
1609 &mut self,
1610 index_name: &str,
1611 locators: I,
1612 ) -> Result<usize, StorageError>
1613 where
1614 I: IntoIterator<Item = (String, RowLocator)>,
1615 {
1616 let idx = self
1617 .indices
1618 .iter_mut()
1619 .find(|i| i.name == index_name)
1620 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1621 let map = match &mut idx.kind {
1622 IndexKind::Gin(map) => map,
1623 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1624 return Err(StorageError::Corrupt(format!(
1625 "register_gin_cold_locators: index {index_name:?} is not GIN"
1626 )));
1627 }
1628 };
1629 let mut count = 0usize;
1630 for (word, locator) in locators {
1631 let mut entries = map.get(&word).cloned().unwrap_or_default();
1632 entries.push(locator);
1633 map.insert_mut(word, entries);
1634 count += 1;
1635 }
1636 Ok(count)
1637 }
1638
1639 pub fn remove_cold_locators_for_key(
1649 &mut self,
1650 index_name: &str,
1651 key: &IndexKey,
1652 ) -> Result<usize, StorageError> {
1653 let idx = self
1654 .indices
1655 .iter_mut()
1656 .find(|i| i.name == index_name)
1657 .ok_or_else(|| {
1658 StorageError::Corrupt(format!(
1659 "remove_cold_locators_for_key: index {index_name:?} not found"
1660 ))
1661 })?;
1662 let map = match &mut idx.kind {
1663 IndexKind::BTree(map) => map,
1664 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1665 return Err(StorageError::Corrupt(format!(
1666 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1667 cold locators apply only to BTree indices"
1668 )));
1669 }
1670 };
1671 let Some(entries) = map.get(key) else {
1672 return Ok(0);
1673 };
1674 let mut kept: Vec<RowLocator> =
1675 entries.iter().copied().filter(RowLocator::is_hot).collect();
1676 let removed = entries.len() - kept.len();
1677 if removed == 0 {
1678 return Ok(0);
1679 }
1680 kept.shrink_to_fit();
1681 map.insert_mut(key.clone(), kept);
1689 Ok(removed)
1690 }
1691
1692 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1698 if positions.is_empty() {
1699 return 0;
1700 }
1701 let mut to_remove = alloc::vec![false; self.rows.len()];
1705 let mut removed = 0;
1706 for &p in positions {
1707 if p < to_remove.len() && !to_remove[p] {
1708 to_remove[p] = true;
1709 removed += 1;
1710 }
1711 }
1712 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1713 let mut removed_bytes: u64 = 0;
1714 for (i, row) in self.rows.iter().enumerate() {
1715 if to_remove[i] {
1716 removed_bytes =
1717 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1718 } else {
1719 new_rows.push_mut(row.clone());
1720 }
1721 }
1722 self.rows = new_rows;
1723 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1724 self.rebuild_indices();
1725 removed
1726 }
1727
1728 pub fn update_row(
1734 &mut self,
1735 position: usize,
1736 new_values: Vec<Value>,
1737 ) -> Result<(), StorageError> {
1738 if position >= self.rows.len() {
1739 return Err(StorageError::Corrupt(alloc::format!(
1740 "update_row: position {position} out of bounds (rows={})",
1741 self.rows.len()
1742 )));
1743 }
1744 if new_values.len() != self.schema.columns.len() {
1745 return Err(StorageError::ArityMismatch {
1746 expected: self.schema.columns.len(),
1747 actual: new_values.len(),
1748 });
1749 }
1750 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1754 if val.is_null() {
1755 if !col.nullable {
1756 return Err(StorageError::NullInNotNull {
1757 column: col.name.clone(),
1758 });
1759 }
1760 continue;
1761 }
1762 let actual = val.data_type().expect("non-null");
1763 let compatible = actual == col.ty
1764 || matches!(
1765 (actual, col.ty),
1766 (
1767 DataType::Text,
1768 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1769 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1770 | (DataType::Json, DataType::Jsonb)
1771 | (DataType::Jsonb, DataType::Json)
1772 | (DataType::Timestamp, DataType::Timestamptz)
1773 | (DataType::Timestamptz, DataType::Timestamp)
1774 )
1775 || matches!(
1776 (actual, col.ty),
1777 (
1778 DataType::Numeric { scale: a, .. },
1779 DataType::Numeric { scale: b, .. },
1780 ) if a == b
1781 );
1782 if !compatible {
1783 return Err(StorageError::TypeMismatch {
1784 column: col.name.clone(),
1785 expected: col.ty,
1786 actual,
1787 position: i,
1788 });
1789 }
1790 }
1791 let old_row = self
1792 .rows
1793 .get(position)
1794 .expect("position bounds-checked above");
1795 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1796 let new_row = Row::new(new_values);
1797 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1798 self.rows = self
1799 .rows
1800 .set(position, new_row)
1801 .expect("position bounds-checked above");
1802 self.hot_bytes = self
1803 .hot_bytes
1804 .saturating_sub(old_bytes)
1805 .saturating_add(new_bytes);
1806 self.rebuild_indices();
1807 Ok(())
1808 }
1809
1810 fn rebuild_indices(&mut self) {
1817 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1826 .indices
1827 .iter()
1828 .filter_map(|idx| match &idx.kind {
1829 IndexKind::BTree(map) => {
1830 let cold: Vec<(IndexKey, RowLocator)> = map
1831 .iter()
1832 .flat_map(|(k, locs)| {
1833 locs.iter()
1834 .filter(|l| l.is_cold())
1835 .copied()
1836 .map(move |l| (k.clone(), l))
1837 })
1838 .collect();
1839 if cold.is_empty() {
1840 None
1841 } else {
1842 Some((idx.name.clone(), cold))
1843 }
1844 }
1845 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
1848 })
1849 .collect();
1850
1851 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
1856 .indices
1857 .iter()
1858 .filter_map(|idx| match &idx.kind {
1859 IndexKind::Gin(map) => {
1860 let cold: Vec<(String, RowLocator)> = map
1861 .iter()
1862 .flat_map(|(w, locs)| {
1863 locs.iter()
1864 .filter(|l| l.is_cold())
1865 .copied()
1866 .map(move |l| (w.clone(), l))
1867 })
1868 .collect();
1869 if cold.is_empty() {
1870 None
1871 } else {
1872 Some((idx.name.clone(), cold))
1873 }
1874 }
1875 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1876 })
1877 .collect();
1878
1879 #[derive(Clone)]
1884 enum RebuildKind {
1885 BTree,
1886 Nsw(usize),
1887 Brin(DataType),
1888 Gin,
1889 }
1890 let descriptors: Vec<(String, usize, RebuildKind)> = self
1891 .indices
1892 .iter()
1893 .map(|idx| {
1894 let kind = match &idx.kind {
1895 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1896 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1897 IndexKind::BTree(_) => RebuildKind::BTree,
1898 IndexKind::Gin(_) => RebuildKind::Gin,
1899 };
1900 (idx.name.clone(), idx.column_position, kind)
1901 })
1902 .collect();
1903 self.indices.clear();
1904 for (name, column_position, rebuild_kind) in descriptors {
1905 match rebuild_kind {
1906 RebuildKind::Nsw(m) => {
1907 let idx = Index::new_nsw(name, column_position, m);
1908 self.indices.push(idx);
1909 let idx_pos = self.indices.len() - 1;
1910 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1911 for row_idx in row_indices {
1912 nsw_insert_at(self, idx_pos, row_idx);
1913 }
1914 }
1915 RebuildKind::Brin(column_type) => {
1916 self.indices
1919 .push(Index::new_brin(name, column_position, column_type));
1920 }
1921 RebuildKind::BTree => {
1922 let mut idx = Index::new_btree(name, column_position);
1923 if let IndexKind::BTree(map) = &mut idx.kind {
1924 for (i, row) in self.rows.iter().enumerate() {
1925 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1926 let mut entries = map.get(&key).cloned().unwrap_or_default();
1927 entries.push(RowLocator::Hot(i));
1928 map.insert_mut(key, entries);
1929 }
1930 }
1931 }
1932 self.indices.push(idx);
1933 }
1934 RebuildKind::Gin => {
1935 let mut idx = Index::new_gin(name, column_position);
1936 if let IndexKind::Gin(map) = &mut idx.kind {
1937 for (i, row) in self.rows.iter().enumerate() {
1938 if let Value::TsVector(lexemes) = &row.values[column_position] {
1939 for lex in lexemes {
1940 let mut entries =
1941 map.get(&lex.word).cloned().unwrap_or_default();
1942 entries.push(RowLocator::Hot(i));
1943 map.insert_mut(lex.word.clone(), entries);
1944 }
1945 }
1946 }
1947 }
1948 self.indices.push(idx);
1949 }
1950 }
1951 }
1952
1953 for (idx_name, locators) in preserved_cold {
1958 let _ = self.register_cold_locators(&idx_name, locators);
1962 }
1963 for (idx_name, locators) in preserved_gin_cold {
1965 let _ = self.register_gin_cold_locators(&idx_name, locators);
1966 }
1967 }
1968
1969 fn add_nsw_index_inner(
1970 &mut self,
1971 name: String,
1972 column_name: &str,
1973 m: usize,
1974 restore: Option<NswGraph>,
1975 ) -> Result<(), StorageError> {
1976 if self.indices.iter().any(|i| i.name == name) {
1977 return Err(StorageError::DuplicateIndex { name });
1978 }
1979 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1980 StorageError::ColumnNotFound {
1981 column: column_name.into(),
1982 }
1983 })?;
1984 if !matches!(
1985 self.schema.columns[column_position].ty,
1986 DataType::Vector { .. }
1987 ) {
1988 return Err(StorageError::TypeMismatch {
1989 column: column_name.into(),
1990 expected: DataType::Vector {
1991 dim: 0,
1992 encoding: VecEncoding::F32,
1993 },
1994 actual: self.schema.columns[column_position].ty,
1995 position: column_position,
1996 });
1997 }
1998 if let Some(graph) = restore {
1999 self.indices.push(Index {
2000 name,
2001 column_position,
2002 kind: IndexKind::Nsw(graph),
2003 included_columns: Vec::new(),
2004 partial_predicate: None,
2005 expression: None,
2006 is_unique: false,
2007 extra_column_positions: Vec::new(),
2008 });
2009 return Ok(());
2010 }
2011 let idx = Index::new_nsw(name, column_position, m);
2012 self.indices.push(idx);
2013 let idx_pos = self.indices.len() - 1;
2014 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2017 for row_idx in row_indices {
2018 nsw_insert_at(self, idx_pos, row_idx);
2019 }
2020 Ok(())
2021 }
2022}
2023
2024fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2031 if matches!(cell, Value::Null) {
2032 return Ok(cell);
2033 }
2034 let as_f32: Vec<f32> = match &cell {
2036 Value::Vector(v) => v.clone(),
2037 Value::Sq8Vector(q) => quantize::dequantize(q),
2038 Value::HalfVector(h) => h.to_f32_vec(),
2039 other => {
2040 return Err(StorageError::Unsupported(format!(
2041 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2042 other.data_type()
2043 )));
2044 }
2045 };
2046 Ok(match target {
2051 VecEncoding::F32 => Value::Vector(as_f32),
2052 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2053 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2054 })
2055}
2056
2057fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2064 let col_pos = table.indices[idx_pos].column_position;
2065 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2066 Value::Vector(v) => Some(v.len()),
2067 Value::Sq8Vector(q) => Some(q.bytes.len()),
2068 Value::HalfVector(h) => Some(h.dim()),
2069 _ => None,
2070 };
2071 let Some(dim) = cell_dim else {
2072 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2075 return;
2076 };
2077 if dim == 0 {
2078 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2079 return;
2080 }
2081 let level = nsw_assign_level(new_row_idx);
2082 ensure_node_slot(table, idx_pos, new_row_idx, level);
2083 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2084 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2085 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2086 unreachable!("nsw_insert_at on a non-NSW index")
2087 }
2088 };
2089 if entry.is_none() {
2091 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2092 g.entry = Some(new_row_idx);
2093 g.entry_level = level;
2094 *g.levels
2095 .get_mut(new_row_idx)
2096 .expect("levels slot padded by ensure_node_slot") = level;
2097 }
2098 return;
2099 }
2100 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2102 *g.levels
2103 .get_mut(new_row_idx)
2104 .expect("levels slot padded by ensure_node_slot") = level;
2105 }
2106 let query = match &table.rows[new_row_idx].values[col_pos] {
2107 Value::Vector(v) => v.clone(),
2108 Value::Sq8Vector(q) => quantize::dequantize(q),
2114 Value::HalfVector(h) => h.to_f32_vec(),
2117 _ => return,
2118 };
2119 let mut current = entry.expect("entry was Some above");
2122 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2123 if entry_level > level {
2124 for layer in (level + 1..=entry_level).rev() {
2125 (current, current_d) =
2126 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2127 }
2128 }
2129 let top = level.min(entry_level);
2133 let ef = (m * 2).max(8);
2134 for layer in (0..=top).rev() {
2135 let cap = if layer == 0 { m * 2 } else { m };
2136 let mut candidates = layer_beam_search(
2137 table,
2138 idx_pos,
2139 layer,
2140 current,
2141 current_d,
2142 &query,
2143 ef,
2144 NswMetric::L2,
2145 );
2146 candidates.retain(|&(_, n)| n != new_row_idx);
2147 if let Some(&(d, n)) = candidates.first() {
2150 current = n;
2151 current_d = d;
2152 }
2153 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2154 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2155 }
2156 if level > entry_level
2159 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2160 {
2161 g.entry = Some(new_row_idx);
2162 g.entry_level = level;
2163 }
2164}
2165
2166fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2170 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2171 unreachable!("ensure_node_slot on a BTree index");
2172 };
2173 while g.layers.len() <= level as usize {
2174 g.layers.push(PersistentVec::new());
2175 }
2176 while g.levels.len() <= new_row_idx {
2177 g.levels.push_mut(0);
2178 }
2179 for layer_vec in &mut g.layers {
2180 while layer_vec.len() <= new_row_idx {
2181 layer_vec.push_mut(Vec::new());
2182 }
2183 }
2184}
2185
2186fn greedy_layer_walk(
2192 table: &Table,
2193 idx_pos: usize,
2194 layer: u8,
2195 mut current: usize,
2196 mut current_d: f32,
2197 query: &[f32],
2198) -> (usize, f32) {
2199 let g = match &table.indices[idx_pos].kind {
2200 IndexKind::Nsw(g) => g,
2201 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2202 return (current, current_d);
2203 }
2204 };
2205 let col_pos = table.indices[idx_pos].column_position;
2206 loop {
2207 let neighbours: &[u32] = g
2208 .layers
2209 .get(layer as usize)
2210 .and_then(|layer_v| layer_v.get(current))
2211 .map_or(&[][..], Vec::as_slice);
2212 let mut best = current;
2213 let mut best_d = current_d;
2214 for &n in neighbours {
2215 let n = n as usize;
2216 let d = vec_l2_sq(table, col_pos, n, query);
2217 if d < best_d {
2218 best = n;
2219 best_d = d;
2220 }
2221 }
2222 if best == current {
2223 return (current, current_d);
2224 }
2225 current = best;
2226 current_d = best_d;
2227 }
2228}
2229
2230#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2243 table: &Table,
2244 idx_pos: usize,
2245 layer: u8,
2246 entry_node: usize,
2247 entry_d: f32,
2248 query: &[f32],
2249 ef: usize,
2250 metric: NswMetric,
2251) -> Vec<(f32, usize)> {
2252 let g = match &table.indices[idx_pos].kind {
2253 IndexKind::Nsw(g) => g,
2254 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2255 };
2256 let col_pos = table.indices[idx_pos].column_position;
2257 let d0 = if matches!(metric, NswMetric::L2) {
2258 entry_d
2259 } else {
2260 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2261 };
2262 let row_count = table.rows.len();
2263 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2264 if entry_node < row_count {
2265 visited[entry_node] = true;
2266 }
2267 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2270 alloc::collections::BinaryHeap::with_capacity(ef);
2271 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2272 alloc::collections::BinaryHeap::with_capacity(ef);
2273 candidates.push(NodeClosest {
2274 dist: d0,
2275 node: entry_node,
2276 });
2277 results.push(NodeFurthest {
2278 dist: d0,
2279 node: entry_node,
2280 });
2281 while let Some(cur) = candidates.pop() {
2282 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2283 if cur.dist > worst && results.len() >= ef {
2284 break;
2285 }
2286 let neighbours: &[u32] = g
2287 .layers
2288 .get(layer as usize)
2289 .and_then(|layer_v| layer_v.get(cur.node))
2290 .map_or(&[][..], Vec::as_slice);
2291 for &n in neighbours {
2292 let n = n as usize;
2293 if n >= row_count || visited[n] {
2294 continue;
2295 }
2296 visited[n] = true;
2297 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2301 if !dn.is_finite() {
2302 continue;
2303 }
2304 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2305 if results.len() < ef || dn < worst {
2306 results.push(NodeFurthest { dist: dn, node: n });
2307 if results.len() > ef {
2308 results.pop();
2309 }
2310 candidates.push(NodeClosest { dist: dn, node: n });
2311 }
2312 }
2313 }
2314 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2317 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2318 out
2319}
2320
2321#[derive(Debug, Clone, Copy)]
2325struct NodeClosest {
2326 dist: f32,
2327 node: usize,
2328}
2329impl PartialEq for NodeClosest {
2330 fn eq(&self, other: &Self) -> bool {
2331 self.dist == other.dist && self.node == other.node
2332 }
2333}
2334impl Eq for NodeClosest {}
2335impl PartialOrd for NodeClosest {
2336 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2337 Some(self.cmp(other))
2338 }
2339}
2340impl Ord for NodeClosest {
2341 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2342 other
2344 .dist
2345 .partial_cmp(&self.dist)
2346 .unwrap_or(core::cmp::Ordering::Equal)
2347 }
2348}
2349
2350#[derive(Debug, Clone, Copy)]
2353struct NodeFurthest {
2354 dist: f32,
2355 node: usize,
2356}
2357impl PartialEq for NodeFurthest {
2358 fn eq(&self, other: &Self) -> bool {
2359 self.dist == other.dist && self.node == other.node
2360 }
2361}
2362impl Eq for NodeFurthest {}
2363impl PartialOrd for NodeFurthest {
2364 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2365 Some(self.cmp(other))
2366 }
2367}
2368impl Ord for NodeFurthest {
2369 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2370 self.dist
2371 .partial_cmp(&other.dist)
2372 .unwrap_or(core::cmp::Ordering::Equal)
2373 }
2374}
2375
2376fn select_neighbours_heuristic(
2385 candidates: &[(f32, usize)],
2386 m: usize,
2387 table: &Table,
2388 col_pos: usize,
2389) -> Vec<usize> {
2390 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2391 for &(d_q, e) in candidates {
2392 if chosen.len() >= m {
2393 break;
2394 }
2395 if !matches!(
2400 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2401 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2402 ) {
2403 continue;
2404 }
2405 let mut covered = false;
2406 for &r in &chosen {
2407 if cell_l2_sq(table, col_pos, e, r) < d_q {
2411 covered = true;
2412 break;
2413 }
2414 }
2415 if !covered {
2416 chosen.push(e);
2417 }
2418 }
2419 chosen
2420}
2421
2422fn connect_at_layer(
2426 table: &mut Table,
2427 idx_pos: usize,
2428 layer: u8,
2429 new_row_idx: usize,
2430 peers: &[usize],
2431) {
2432 let col_pos = table.indices[idx_pos].column_position;
2433 let cap = match &table.indices[idx_pos].kind {
2434 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2435 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return,
2436 };
2437 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2442 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2443 let layer_v = &mut g.layers[layer as usize];
2444 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2445 *slot = peers
2446 .iter()
2447 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2448 .collect();
2449 }
2450 }
2451 for &peer in peers {
2452 if !matches!(
2456 &table.rows[peer].values[col_pos],
2457 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2458 ) {
2459 continue;
2460 }
2461 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2463 let layer_v = &mut g.layers[layer as usize];
2464 if let Some(slot) = layer_v.get_mut(peer)
2465 && !slot.contains(&new_row_u32)
2466 {
2467 slot.push(new_row_u32);
2468 }
2469 }
2470 let needs_trim = match &table.indices[idx_pos].kind {
2474 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2475 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => false,
2476 };
2477 if needs_trim {
2478 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2479 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2480 .iter()
2481 .map(|&n| n as usize)
2482 .collect(),
2483 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => continue,
2484 };
2485 let mut tagged: Vec<(f32, usize)> = current_peers
2490 .iter()
2491 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2492 .collect();
2493 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2494 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2495 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2496 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2497 {
2498 *slot = kept
2499 .into_iter()
2500 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2501 .collect();
2502 }
2503 }
2504 }
2505}
2506
2507fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2514 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2515 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2516 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2517 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2518 }
2519 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2523 halfvec::half_l2_distance_sq_asymmetric(h, query)
2524 }
2525 _ => f32::INFINITY,
2526 }
2527}
2528
2529fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2536 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2537 return f32::INFINITY;
2538 };
2539 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2540 return f32::INFINITY;
2541 };
2542 match (cell_a, cell_b) {
2543 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2544 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2545 quantize::sq8_l2_distance_sq(a, b)
2546 }
2547 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2552 halfvec::half_l2_distance_sq(a, b)
2553 }
2554 _ => f32::INFINITY,
2555 }
2556}
2557
2558fn cell_to_query_metric_distance(
2563 table: &Table,
2564 col_pos: usize,
2565 row: usize,
2566 query: &[f32],
2567 metric: NswMetric,
2568) -> f32 {
2569 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2570 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2571 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2572 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2573 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2574 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2575 },
2576 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2579 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2580 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2581 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2582 },
2583 _ => f32::INFINITY,
2584 }
2585}
2586
2587#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2593pub enum NswMetric {
2594 L2,
2597 InnerProduct,
2600 Cosine,
2603}
2604
2605fn nsw_search(
2611 table: &Table,
2612 idx_pos: usize,
2613 query: &[f32],
2614 k: usize,
2615 ef: usize,
2616 metric: NswMetric,
2617) -> Vec<(f32, usize)> {
2618 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2619 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2620 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2621 };
2622 let Some(entry) = entry else {
2623 return Vec::new();
2624 };
2625 let col_pos = table.indices[idx_pos].column_position;
2626 let sq8 = matches!(
2633 table.schema.columns.get(col_pos).map(|c| c.ty),
2634 Some(DataType::Vector {
2635 encoding: VecEncoding::Sq8,
2636 ..
2637 })
2638 );
2639 let ef = if sq8 {
2640 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2641 } else {
2642 ef.max(k)
2643 };
2644 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2646 let mut current = entry;
2647 let mut current_d = entry_d;
2648 for layer in (1..=entry_level).rev() {
2649 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2650 }
2651 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2653 if sq8 {
2654 results = sq8_rerank(table, col_pos, &results, query, metric);
2655 }
2656 results.truncate(k);
2657 results
2658}
2659
2660fn sq8_rerank(
2667 table: &Table,
2668 col_pos: usize,
2669 candidates: &[(f32, usize)],
2670 query: &[f32],
2671 metric: NswMetric,
2672) -> Vec<(f32, usize)> {
2673 let mut out: Vec<(f32, usize)> = candidates
2674 .iter()
2675 .filter_map(|&(adc_d, row)| {
2676 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2677 let Value::Sq8Vector(q) = cell else {
2678 return Some((adc_d, row));
2682 };
2683 let deq = quantize::dequantize(q);
2684 if deq.len() != query.len() {
2685 return None;
2686 }
2687 Some((metric_distance(metric, &deq, query), row))
2688 })
2689 .collect();
2690 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2691 out
2692}
2693
2694const SQ8_RERANK_OVER_FETCH: usize = 3;
2698
2699fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2700 match metric {
2701 NswMetric::L2 => l2_distance_sq(a, b),
2702 NswMetric::InnerProduct => -inner_product_f32(a, b),
2703 NswMetric::Cosine => {
2704 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2705 if na == 0.0 || nb == 0.0 {
2706 return f32::INFINITY;
2707 }
2708 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2711 1.0 - dot / denom
2712 }
2713 }
2714}
2715
2716#[doc(hidden)]
2725#[inline]
2726pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2727 #[cfg(target_arch = "aarch64")]
2728 {
2729 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2730 return unsafe { inner_product_neon(a, b) };
2733 }
2734 }
2735 inner_product_scalar(a, b)
2736}
2737
2738fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2739 let mut dot: f32 = 0.0;
2740 for (x, y) in a.iter().zip(b.iter()) {
2741 dot += x * y;
2742 }
2743 dot
2744}
2745
2746#[cfg(target_arch = "aarch64")]
2747#[target_feature(enable = "neon")]
2748#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2750 use core::arch::aarch64::{
2751 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2752 };
2753 unsafe {
2754 let zero: float32x4_t = vdupq_n_f32(0.0);
2757 let mut acc0 = zero;
2758 let mut acc1 = zero;
2759 let n = a.len();
2760 let mut i = 0usize;
2761 while i + 8 <= n {
2762 let av0 = vld1q_f32(a.as_ptr().add(i));
2763 let bv0 = vld1q_f32(b.as_ptr().add(i));
2764 acc0 = vfmaq_f32(acc0, av0, bv0);
2765 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2766 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2767 acc1 = vfmaq_f32(acc1, av1, bv1);
2768 i += 8;
2769 }
2770 while i + 4 <= n {
2771 let av = vld1q_f32(a.as_ptr().add(i));
2772 let bv = vld1q_f32(b.as_ptr().add(i));
2773 acc0 = vfmaq_f32(acc0, av, bv);
2774 i += 4;
2775 }
2776 vaddvq_f32(vaddq_f32(acc0, acc1))
2777 }
2778}
2779
2780#[doc(hidden)]
2787#[inline]
2788pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2789 #[cfg(target_arch = "aarch64")]
2790 {
2791 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2792 return unsafe { cosine_dot_norms_neon(a, b) };
2794 }
2795 }
2796 cosine_dot_norms_scalar(a, b)
2797}
2798
2799fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2800 let mut dot: f32 = 0.0;
2801 let mut na: f32 = 0.0;
2802 let mut nb: f32 = 0.0;
2803 for (x, y) in a.iter().zip(b.iter()) {
2804 dot += x * y;
2805 na += x * x;
2806 nb += y * y;
2807 }
2808 (dot, na, nb)
2809}
2810
2811#[cfg(target_arch = "aarch64")]
2812#[target_feature(enable = "neon")]
2813#[allow(clippy::many_single_char_names, clippy::similar_names)]
2814unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2815 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2816 unsafe {
2817 let zero: float32x4_t = vdupq_n_f32(0.0);
2818 let mut acc_dot = zero;
2819 let mut acc_na = zero;
2820 let mut acc_nb = zero;
2821 let n = a.len();
2822 let mut i = 0usize;
2823 while i + 4 <= n {
2824 let av = vld1q_f32(a.as_ptr().add(i));
2825 let bv = vld1q_f32(b.as_ptr().add(i));
2826 acc_dot = vfmaq_f32(acc_dot, av, bv);
2827 acc_na = vfmaq_f32(acc_na, av, av);
2828 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2829 i += 4;
2830 }
2831 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2832 }
2833}
2834
2835fn sqrt_newton_f32(x: f32) -> f32 {
2836 if x <= 0.0 {
2837 return 0.0;
2838 }
2839 let mut g = x;
2840 for _ in 0..10 {
2841 g = 0.5 * (g + x / g);
2842 }
2843 g
2844}
2845
2846#[inline]
2854fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2855 #[cfg(target_arch = "aarch64")]
2856 {
2857 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2858 return unsafe { l2_distance_sq_neon(a, b) };
2862 }
2863 }
2864 l2_distance_sq_scalar(a, b)
2865}
2866
2867fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2868 let mut sum: f32 = 0.0;
2869 for (x, y) in a.iter().zip(b.iter()) {
2870 let d = *x - *y;
2871 sum += d * d;
2872 }
2873 sum
2874}
2875
2876#[cfg(target_arch = "aarch64")]
2877#[target_feature(enable = "neon")]
2878#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2880 use core::arch::aarch64::{
2881 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2882 };
2883 unsafe {
2884 let zero: float32x4_t = vdupq_n_f32(0.0);
2889 let mut acc0 = zero;
2890 let mut acc1 = zero;
2891 let n = a.len();
2892 let mut i = 0usize;
2893 while i + 8 <= n {
2896 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2897 acc0 = vfmaq_f32(acc0, d0, d0);
2898 let d1 = vsubq_f32(
2899 vld1q_f32(a.as_ptr().add(i + 4)),
2900 vld1q_f32(b.as_ptr().add(i + 4)),
2901 );
2902 acc1 = vfmaq_f32(acc1, d1, d1);
2903 i += 8;
2904 }
2905 while i + 4 <= n {
2906 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2907 acc0 = vfmaq_f32(acc0, d, d);
2908 i += 4;
2909 }
2910 vaddvq_f32(vaddq_f32(acc0, acc1))
2911 }
2912}
2913
2914pub fn nsw_query(
2917 table: &Table,
2918 idx_name: &str,
2919 query: &[f32],
2920 k: usize,
2921 metric: NswMetric,
2922) -> Vec<usize> {
2923 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2924 return Vec::new();
2925 };
2926 let ef = (k * 2).max(NSW_DEFAULT_M);
2927 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2928 hits.truncate(k);
2929 hits.into_iter().map(|(_, idx)| idx).collect()
2930}
2931
2932pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2936 table
2937 .indices
2938 .iter()
2939 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2940}
2941
2942#[derive(Debug, Clone, Default)]
2954pub struct Catalog {
2955 tables: Vec<Table>,
2956 by_name: BTreeMap<String, usize>,
2959 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2981 functions: BTreeMap<String, FunctionDef>,
2988 triggers: Vec<TriggerDef>,
2993}
2994
2995#[derive(Debug, Clone, PartialEq, Eq)]
3001pub struct FunctionDef {
3002 pub name: String,
3003 pub args_repr: String,
3007 pub returns: String,
3012 pub language: String,
3014 pub body: String,
3019}
3020
3021#[derive(Debug, Clone, PartialEq, Eq)]
3025pub struct TriggerDef {
3026 pub name: String,
3027 pub table: String,
3029 pub timing: String,
3033 pub events: Vec<String>,
3036 pub for_each: String,
3040 pub function: String,
3042}
3043
3044impl Catalog {
3045 pub const fn new() -> Self {
3046 Self {
3047 tables: Vec::new(),
3048 by_name: BTreeMap::new(),
3049 cold_segments: Vec::new(),
3050 functions: BTreeMap::new(),
3051 triggers: Vec::new(),
3052 }
3053 }
3054
3055 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3059 &self.functions
3060 }
3061
3062 pub fn create_function(
3066 &mut self,
3067 def: FunctionDef,
3068 or_replace: bool,
3069 ) -> Result<(), StorageError> {
3070 if !or_replace && self.functions.contains_key(&def.name) {
3071 return Err(StorageError::Corrupt(format!(
3072 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3073 def.name
3074 )));
3075 }
3076 self.functions.insert(def.name.clone(), def);
3077 Ok(())
3078 }
3079
3080 pub fn drop_function(&mut self, name: &str) -> bool {
3084 self.functions.remove(name).is_some()
3085 }
3086
3087 pub fn triggers(&self) -> &[TriggerDef] {
3091 &self.triggers
3092 }
3093
3094 pub fn create_trigger(
3100 &mut self,
3101 def: TriggerDef,
3102 or_replace: bool,
3103 ) -> Result<(), StorageError> {
3104 if !self.by_name.contains_key(&def.table) {
3105 return Err(StorageError::TableNotFound {
3106 name: def.table.clone(),
3107 });
3108 }
3109 if !self.functions.contains_key(&def.function) {
3110 return Err(StorageError::Corrupt(format!(
3111 "trigger {:?} references unknown function {:?}",
3112 def.name, def.function
3113 )));
3114 }
3115 let dup = self
3116 .triggers
3117 .iter()
3118 .position(|t| t.name == def.name && t.table == def.table);
3119 match (dup, or_replace) {
3120 (Some(_), false) => Err(StorageError::Corrupt(format!(
3121 "trigger {:?} already exists on table {:?}",
3122 def.name, def.table
3123 ))),
3124 (Some(i), true) => {
3125 self.triggers[i] = def;
3126 Ok(())
3127 }
3128 (None, _) => {
3129 self.triggers.push(def);
3130 Ok(())
3131 }
3132 }
3133 }
3134
3135 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3138 let before = self.triggers.len();
3139 self.triggers
3140 .retain(|t| !(t.name == name && t.table == table));
3141 before != self.triggers.len()
3142 }
3143
3144 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3145 if self.by_name.contains_key(&schema.name) {
3146 return Err(StorageError::DuplicateTable {
3147 name: schema.name.clone(),
3148 });
3149 }
3150 let idx = self.tables.len();
3151 let name = schema.name.clone();
3152 self.tables.push(Table::new(schema));
3153 self.by_name.insert(name, idx);
3154 Ok(())
3155 }
3156
3157 pub fn get(&self, name: &str) -> Option<&Table> {
3158 let idx = *self.by_name.get(name)?;
3159 self.tables.get(idx)
3160 }
3161
3162 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3163 let idx = *self.by_name.get(name)?;
3164 self.tables.get_mut(idx)
3165 }
3166
3167 pub fn table_count(&self) -> usize {
3168 self.tables.len()
3169 }
3170
3171 pub fn table_names(&self) -> Vec<String> {
3174 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3175 }
3176
3177 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3188 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3189 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3190 })?;
3191 let seg = OwnedSegment::from_bytes(bytes)
3192 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3193 self.cold_segments.push(Some(Arc::new(seg)));
3194 Ok(id)
3195 }
3196
3197 pub fn load_segment_bytes_at(
3210 &mut self,
3211 target_id: u32,
3212 bytes: Vec<u8>,
3213 ) -> Result<(), StorageError> {
3214 let seg = OwnedSegment::from_bytes(bytes)
3215 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3216 let idx = target_id as usize;
3217 while self.cold_segments.len() <= idx {
3218 self.cold_segments.push(None);
3219 }
3220 if self.cold_segments[idx].is_some() {
3221 return Err(StorageError::Corrupt(format!(
3222 "load_segment_bytes_at: segment_id {target_id} already occupied"
3223 )));
3224 }
3225 self.cold_segments[idx] = Some(Arc::new(seg));
3226 Ok(())
3227 }
3228
3229 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3239 let idx = segment_id as usize;
3240 if idx >= self.cold_segments.len() {
3241 return Err(StorageError::Corrupt(format!(
3242 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3243 self.cold_segments.len()
3244 )));
3245 }
3246 self.cold_segments[idx] = None;
3247 Ok(())
3248 }
3249
3250 #[must_use]
3252 pub fn cold_segment_count(&self) -> usize {
3253 self.cold_segments.iter().filter(|s| s.is_some()).count()
3254 }
3255
3256 #[must_use]
3259 pub fn cold_segment_slot_count(&self) -> usize {
3260 self.cold_segments.len()
3261 }
3262
3263 #[must_use]
3268 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3269 self.cold_segments
3270 .iter()
3271 .enumerate()
3272 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3273 .collect()
3274 }
3275
3276 #[must_use]
3283 pub fn hot_tier_bytes(&self) -> u64 {
3284 self.tables
3285 .iter()
3286 .map(Table::hot_bytes)
3287 .fold(0u64, u64::saturating_add)
3288 }
3289
3290 pub fn freeze_oldest_to_cold(
3335 &mut self,
3336 table_name: &str,
3337 index_name: &str,
3338 max_rows: usize,
3339 ) -> Result<FreezeReport, StorageError> {
3340 if max_rows == 0 {
3342 return Err(StorageError::Corrupt(
3343 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3344 ));
3345 }
3346 let table = self.get(table_name).ok_or_else(|| {
3347 StorageError::Corrupt(format!(
3348 "freeze_oldest_to_cold: table {table_name:?} not found"
3349 ))
3350 })?;
3351 if max_rows > table.rows.len() {
3352 return Err(StorageError::Corrupt(format!(
3353 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3354 table.rows.len()
3355 )));
3356 }
3357 let idx = table
3358 .indices
3359 .iter()
3360 .find(|i| i.name == index_name)
3361 .ok_or_else(|| {
3362 StorageError::Corrupt(format!(
3363 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3364 ))
3365 })?;
3366 if !matches!(idx.kind, IndexKind::BTree(_)) {
3367 return Err(StorageError::Corrupt(format!(
3368 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3369 )));
3370 }
3371 let column_position = idx.column_position;
3372
3373 let schema = table.schema.clone();
3375 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3376 for row_idx in 0..max_rows {
3377 let row = table.rows.get(row_idx).expect("bounds-checked above");
3378 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3379 StorageError::Corrupt(format!(
3380 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3381 ))
3382 })?;
3383 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3384 StorageError::Corrupt(format!(
3385 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3386 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3387 ))
3388 })?;
3389 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3390 }
3391 to_freeze.sort_by_key(|(k, _, _)| *k);
3396 for w in to_freeze.windows(2) {
3400 if w[0].0 == w[1].0 {
3401 return Err(StorageError::Corrupt(format!(
3402 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3403 w[0].0
3404 )));
3405 }
3406 }
3407 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3411 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3415 .into_iter()
3416 .map(|(k, body, _)| (k, body))
3417 .collect();
3418 let frozen_rows = seg_rows.len();
3419 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3420 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3421
3422 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3431 let positions: Vec<usize> = (0..max_rows).collect();
3432 let t_mut = self
3433 .get_mut(table_name)
3434 .expect("just validated; still present");
3435 let removed = t_mut.delete_rows(&positions);
3436 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3437 let bytes_after = t_mut.hot_bytes();
3438 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3439
3440 let segment_id = self
3441 .load_segment_bytes(seg_bytes.clone())
3442 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3443 let new_cold = post_swap_keys.into_iter().map(|k| {
3444 (
3445 k,
3446 RowLocator::Cold {
3447 segment_id,
3448 page_offset: 0,
3449 },
3450 )
3451 });
3452 let t_mut = self.get_mut(table_name).expect("still present");
3453 t_mut.register_cold_locators(index_name, new_cold)?;
3454
3455 Ok(FreezeReport {
3456 segment_id,
3457 frozen_rows,
3458 bytes_freed,
3459 segment_bytes: seg_bytes,
3460 })
3461 }
3462
3463 #[must_use]
3469 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3470 self.cold_segments
3471 .get(segment_id as usize)
3472 .and_then(|s| s.as_deref())
3473 }
3474
3475 pub fn resolve_cold_locator(
3484 &self,
3485 table_name: &str,
3486 segment_id: u32,
3487 key: &IndexKey,
3488 ) -> Option<Row> {
3489 let t = self.get(table_name)?;
3490 let u64_key = index_key_as_u64(key)?;
3491 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3492 let payload = seg.lookup(u64_key)?;
3493 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3494 Some(row)
3495 }
3496
3497 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3515 let t = self.get(table)?;
3516 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3517 let locators = idx.lookup_eq(key);
3518 let cold_u64_key = index_key_as_u64(key);
3519 for loc in locators {
3520 match *loc {
3521 RowLocator::Hot(i) => {
3522 if let Some(row) = t.rows.get(i) {
3523 return Some(row.clone());
3524 }
3525 }
3526 RowLocator::Cold {
3527 segment_id,
3528 page_offset: _,
3529 } => {
3530 let Some(u64_key) = cold_u64_key else {
3531 continue;
3534 };
3535 let Some(seg) = self
3536 .cold_segments
3537 .get(segment_id as usize)
3538 .and_then(|s| s.as_deref())
3539 else {
3540 continue;
3551 };
3552 let Some(payload) = seg.lookup(u64_key) else {
3553 continue;
3554 };
3555 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3556 return Some(row);
3557 }
3558 }
3559 }
3560 None
3561 }
3562
3563 pub fn promote_cold_row(
3585 &mut self,
3586 table_name: &str,
3587 index_name: &str,
3588 key: &IndexKey,
3589 ) -> Result<Option<usize>, StorageError> {
3590 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3591 let Some((segment_id, _page_offset)) = cold_loc else {
3592 return Ok(None);
3593 };
3594 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3595 StorageError::Corrupt(
3596 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3597 .into(),
3598 )
3599 })?;
3600 let schema = self
3604 .get(table_name)
3605 .ok_or_else(|| {
3606 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3607 })?
3608 .schema
3609 .clone();
3610 let seg = self
3611 .cold_segments
3612 .get(segment_id as usize)
3613 .and_then(|s| s.as_ref())
3614 .ok_or_else(|| {
3615 StorageError::Corrupt(format!(
3616 "promote_cold_row: segment {segment_id} not registered on catalog"
3617 ))
3618 })?;
3619 let payload = seg.lookup(u64_key).ok_or_else(|| {
3620 StorageError::Corrupt(format!(
3621 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3622 but the segment's bloom/page lookup didn't return a row"
3623 ))
3624 })?;
3625 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3626 let t = self
3631 .get_mut(table_name)
3632 .expect("table existed at lookup time");
3633 t.insert(row)?;
3634 let new_hot_idx =
3635 t.rows.len().checked_sub(1).ok_or_else(|| {
3636 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3637 })?;
3638 t.remove_cold_locators_for_key(index_name, key)?;
3642 Ok(Some(new_hot_idx))
3643 }
3644
3645 pub fn shadow_cold_row(
3663 &mut self,
3664 table_name: &str,
3665 index_name: &str,
3666 key: &IndexKey,
3667 ) -> Result<usize, StorageError> {
3668 let t = self.get_mut(table_name).ok_or_else(|| {
3669 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3670 })?;
3671 t.remove_cold_locators_for_key(index_name, key)
3672 }
3673
3674 pub fn prepare_freeze_slice(
3692 &self,
3693 table_name: &str,
3694 index_name: &str,
3695 row_range: core::ops::Range<usize>,
3696 ) -> Result<FreezeSlice, StorageError> {
3697 let table = self.get(table_name).ok_or_else(|| {
3698 StorageError::Corrupt(format!(
3699 "prepare_freeze_slice: table {table_name:?} not found"
3700 ))
3701 })?;
3702 let idx = table
3703 .indices
3704 .iter()
3705 .find(|i| i.name == index_name)
3706 .ok_or_else(|| {
3707 StorageError::Corrupt(format!(
3708 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3709 ))
3710 })?;
3711 if !matches!(idx.kind, IndexKind::BTree(_)) {
3712 return Err(StorageError::Corrupt(format!(
3713 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3714 )));
3715 }
3716 if row_range.end > table.rows.len() {
3717 return Err(StorageError::Corrupt(format!(
3718 "prepare_freeze_slice: row_range end {} > row_count {}",
3719 row_range.end,
3720 table.rows.len()
3721 )));
3722 }
3723 let column_position = idx.column_position;
3724 let schema = table.schema.clone();
3725 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3726 for row_idx in row_range.clone() {
3727 let row = table.rows.get(row_idx).expect("bounds-checked above");
3728 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3729 StorageError::Corrupt(format!(
3730 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3731 ))
3732 })?;
3733 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3734 StorageError::Corrupt(format!(
3735 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3736 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3737 ))
3738 })?;
3739 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3740 }
3741 rows.sort_by_key(|(k, _, _)| *k);
3742 Ok(FreezeSlice { row_range, rows })
3743 }
3744
3745 pub fn commit_freeze_slices(
3759 &mut self,
3760 table_name: &str,
3761 index_name: &str,
3762 slices: Vec<FreezeSlice>,
3763 ) -> Result<FreezeReport, StorageError> {
3764 let table = self.get(table_name).ok_or_else(|| {
3766 StorageError::Corrupt(format!(
3767 "commit_freeze_slices: table {table_name:?} not found"
3768 ))
3769 })?;
3770 let idx = table
3771 .indices
3772 .iter()
3773 .find(|i| i.name == index_name)
3774 .ok_or_else(|| {
3775 StorageError::Corrupt(format!(
3776 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3777 ))
3778 })?;
3779 if !matches!(idx.kind, IndexKind::BTree(_)) {
3780 return Err(StorageError::Corrupt(format!(
3781 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3782 )));
3783 }
3784 let mut ordered = slices;
3788 ordered.sort_by_key(|s| s.row_range.start);
3789 let mut expected_start = 0usize;
3793 for s in &ordered {
3794 if s.row_range.start != expected_start {
3795 return Err(StorageError::Corrupt(format!(
3796 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3797 s.row_range.start, expected_start
3798 )));
3799 }
3800 expected_start = s.row_range.end;
3801 }
3802 let max_rows = expected_start;
3803 if max_rows > table.rows.len() {
3804 return Err(StorageError::Corrupt(format!(
3805 "commit_freeze_slices: total row range {} exceeds row_count {}",
3806 max_rows,
3807 table.rows.len()
3808 )));
3809 }
3810 if max_rows == 0 {
3811 return Ok(FreezeReport {
3812 segment_id: u32::MAX,
3813 frozen_rows: 0,
3814 bytes_freed: 0,
3815 segment_bytes: Vec::new(),
3816 });
3817 }
3818
3819 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3824 if total_rows != max_rows {
3825 return Err(StorageError::Corrupt(format!(
3826 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3827 )));
3828 }
3829 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3830 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3831 loop {
3832 let mut pick: Option<usize> = None;
3835 for (i, c) in cursors.iter().enumerate() {
3836 let slice = &ordered[i];
3837 if *c >= slice.rows.len() {
3838 continue;
3839 }
3840 match pick {
3841 None => pick = Some(i),
3842 Some(j) => {
3843 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3844 pick = Some(i);
3845 }
3846 }
3847 }
3848 }
3849 let Some(i) = pick else { break };
3850 let row = ordered[i].rows[cursors[i]].clone();
3851 cursors[i] += 1;
3852 merged.push(row);
3853 }
3854 for w in merged.windows(2) {
3857 if w[0].0 == w[1].0 {
3858 return Err(StorageError::Corrupt(format!(
3859 "commit_freeze_slices: duplicate PK {} across slices",
3860 w[0].0
3861 )));
3862 }
3863 }
3864 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3865 let seg_rows: Vec<(u64, Vec<u8>)> =
3866 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
3867 let frozen_rows = seg_rows.len();
3868 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3869 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
3870
3871 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3873 let positions: Vec<usize> = (0..max_rows).collect();
3874 let t_mut = self
3875 .get_mut(table_name)
3876 .expect("just validated; still present");
3877 let removed = t_mut.delete_rows(&positions);
3878 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3879 let bytes_after = t_mut.hot_bytes();
3880 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3881
3882 let segment_id = self
3883 .load_segment_bytes(seg_bytes.clone())
3884 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3885 let new_cold = post_swap_keys.into_iter().map(|k| {
3886 (
3887 k,
3888 RowLocator::Cold {
3889 segment_id,
3890 page_offset: 0,
3891 },
3892 )
3893 });
3894 let t_mut = self.get_mut(table_name).expect("still present");
3895 t_mut.register_cold_locators(index_name, new_cold)?;
3896
3897 Ok(FreezeReport {
3898 segment_id,
3899 frozen_rows,
3900 bytes_freed,
3901 segment_bytes: seg_bytes,
3902 })
3903 }
3904
3905 pub fn compact_cold_segments(
3948 &mut self,
3949 table_name: &str,
3950 index_name: &str,
3951 target_segment_bytes: u64,
3952 ) -> Result<CompactReport, StorageError> {
3953 let t = self.get(table_name).ok_or_else(|| {
3955 StorageError::Corrupt(format!(
3956 "compact_cold_segments: table {table_name:?} not found"
3957 ))
3958 })?;
3959 let idx = t
3960 .indices
3961 .iter()
3962 .find(|i| i.name == index_name)
3963 .ok_or_else(|| {
3964 StorageError::Corrupt(format!(
3965 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3966 ))
3967 })?;
3968 let map = match &idx.kind {
3969 IndexKind::BTree(m) => m,
3970 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
3971 return Err(StorageError::Corrupt(format!(
3972 "compact_cold_segments: index {index_name:?} is not BTree; \
3973 compaction applies only to BTree cold-tier indices"
3974 )));
3975 }
3976 };
3977
3978 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3981 for (_key, locators) in map.iter() {
3982 for loc in locators {
3983 if let RowLocator::Cold { segment_id, .. } = loc {
3984 referenced_ids.insert(*segment_id);
3985 }
3986 }
3987 }
3988 let candidate_set: BTreeSet<u32> = referenced_ids
3990 .into_iter()
3991 .filter(|id| {
3992 self.cold_segments
3993 .get(*id as usize)
3994 .and_then(|s| s.as_deref())
3995 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3996 })
3997 .collect();
3998 if candidate_set.len() < 2 {
3999 return Ok(CompactReport {
4000 sources: Vec::new(),
4001 merged_segment_id: None,
4002 merged_segment_bytes: Vec::new(),
4003 merged_rows: 0,
4004 deleted_rows_pruned: 0,
4005 bytes_reclaimed_estimate: 0,
4006 });
4007 }
4008 let mut source_row_count: usize = 0;
4010 let mut source_byte_total: u64 = 0;
4011 for &id in &candidate_set {
4012 let seg = self.cold_segments[id as usize]
4013 .as_ref()
4014 .expect("candidate selected only when slot is Some");
4015 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4016 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4017 }
4018 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4024 for (key, locators) in map.iter() {
4025 for loc in locators {
4026 let RowLocator::Cold { segment_id, .. } = loc else {
4027 continue;
4028 };
4029 if !candidate_set.contains(segment_id) {
4030 continue;
4031 }
4032 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4033 StorageError::Corrupt(format!(
4034 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4035 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4036 ))
4037 })?;
4038 let seg = self.cold_segments[*segment_id as usize]
4039 .as_ref()
4040 .expect("candidate slot guaranteed Some above");
4041 let payload = seg.lookup(u64_key).ok_or_else(|| {
4042 StorageError::Corrupt(format!(
4043 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4044 at segment {segment_id} but the segment lookup missed"
4045 ))
4046 })?;
4047 collected.insert(u64_key, (payload, key.clone()));
4048 break;
4049 }
4050 }
4051 let merged_rows = collected.len();
4052 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4053
4054 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4058 .iter()
4059 .map(|(k, (body, _))| (*k, body.clone()))
4060 .collect();
4061 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4062 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4063 let merged_bytes_len = seg_bytes.len() as u64;
4064
4065 let merged_segment_id = self
4067 .load_segment_bytes(seg_bytes.clone())
4068 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4069
4070 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4076 let t = self
4077 .get(table_name)
4078 .expect("table existed at the start of this fn");
4079 let idx = t
4080 .indices
4081 .iter()
4082 .find(|i| i.name == index_name)
4083 .expect("index existed at the start of this fn");
4084 let IndexKind::BTree(map) = &idx.kind else {
4085 unreachable!("validated above");
4086 };
4087 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4088 };
4089 let t_mut = self
4090 .get_mut(table_name)
4091 .expect("table existed at the start of this fn");
4092 let idx_mut = t_mut
4093 .indices
4094 .iter_mut()
4095 .find(|i| i.name == index_name)
4096 .expect("index existed at the start of this fn");
4097 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4098 unreachable!("validated above");
4099 };
4100 for (key, locators) in entries {
4101 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4102 let mut changed = false;
4103 for loc in &locators {
4104 match *loc {
4105 RowLocator::Cold {
4106 segment_id,
4107 page_offset: _,
4108 } if candidate_set.contains(&segment_id) => {
4109 let replacement = RowLocator::Cold {
4110 segment_id: merged_segment_id,
4111 page_offset: 0,
4112 };
4113 if !new_locs.contains(&replacement) {
4114 new_locs.push(replacement);
4115 }
4116 changed = true;
4117 }
4118 other => new_locs.push(other),
4119 }
4120 }
4121 if changed {
4122 map_mut.insert_mut(key, new_locs);
4123 }
4124 }
4125
4126 for &id in &candidate_set {
4131 self.tombstone_segment(id)?;
4132 }
4133
4134 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4135 Ok(CompactReport {
4136 sources: candidate_set.into_iter().collect(),
4137 merged_segment_id: Some(merged_segment_id),
4138 merged_segment_bytes: seg_bytes,
4139 merged_rows,
4140 deleted_rows_pruned,
4141 bytes_reclaimed_estimate,
4142 })
4143 }
4144
4145 fn find_cold_locator(
4151 &self,
4152 table_name: &str,
4153 index_name: &str,
4154 key: &IndexKey,
4155 ) -> Result<Option<(u32, u32)>, StorageError> {
4156 let t = self.get(table_name).ok_or_else(|| {
4157 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4158 })?;
4159 let idx = t
4160 .indices
4161 .iter()
4162 .find(|i| i.name == index_name)
4163 .ok_or_else(|| {
4164 StorageError::Corrupt(format!(
4165 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4166 ))
4167 })?;
4168 if !matches!(idx.kind, IndexKind::BTree(_)) {
4169 return Err(StorageError::Corrupt(format!(
4170 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4171 )));
4172 }
4173 for loc in idx.lookup_eq(key) {
4174 if let RowLocator::Cold {
4175 segment_id,
4176 page_offset,
4177 } = *loc
4178 {
4179 return Ok(Some((segment_id, page_offset)));
4180 }
4181 }
4182 Ok(None)
4183 }
4184}
4185
4186fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4192 match key {
4193 IndexKey::Int(n) => Some(n.cast_unsigned()),
4199 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4200 }
4201}
4202
4203#[derive(Debug, Clone, PartialEq, Eq)]
4204#[non_exhaustive]
4205pub enum StorageError {
4206 DuplicateTable {
4207 name: String,
4208 },
4209 TableNotFound {
4210 name: String,
4211 },
4212 ArityMismatch {
4213 expected: usize,
4214 actual: usize,
4215 },
4216 TypeMismatch {
4217 column: String,
4218 expected: DataType,
4219 actual: DataType,
4220 position: usize,
4221 },
4222 NullInNotNull {
4223 column: String,
4224 },
4225 DuplicateIndex {
4227 name: String,
4228 },
4229 ColumnNotFound {
4231 column: String,
4232 },
4233 Corrupt(String),
4236 IndexNotFound {
4239 name: String,
4240 },
4241 Unsupported(String),
4245}
4246
4247impl fmt::Display for StorageError {
4248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4249 match self {
4250 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4251 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4252 Self::ArityMismatch { expected, actual } => write!(
4253 f,
4254 "row arity mismatch: expected {expected} columns, got {actual}"
4255 ),
4256 Self::TypeMismatch {
4257 column,
4258 expected,
4259 actual,
4260 position,
4261 } => write!(
4262 f,
4263 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4264 ),
4265 Self::NullInNotNull { column } => {
4266 write!(f, "NULL value in NOT NULL column {column:?}")
4267 }
4268 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4269 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4270 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4271 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4272 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4273 }
4274 }
4275}
4276
4277impl ColumnSchema {
4278 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4279 Self {
4280 name: name.into(),
4281 ty,
4282 nullable,
4283 default: None,
4284 runtime_default: None,
4285 auto_increment: false,
4286 }
4287 }
4288
4289 #[must_use]
4293 pub fn with_default(mut self, default: Value) -> Self {
4294 self.default = Some(default);
4295 self
4296 }
4297
4298 #[must_use]
4303 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4304 self.runtime_default = Some(expr.into());
4305 self
4306 }
4307
4308 #[must_use]
4310 pub const fn with_auto_increment(mut self) -> Self {
4311 self.auto_increment = true;
4312 self
4313 }
4314}
4315
4316impl TableSchema {
4317 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4318 Self {
4319 name: name.into(),
4320 columns,
4321 hot_tier_bytes: None,
4322 foreign_keys: Vec::new(),
4323 uniqueness_constraints: Vec::new(),
4324 }
4325 }
4326}
4327
4328const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4376const FILE_VERSION: u8 = 22;
4402const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4405
4406const INDEX_KEY_TAG_INT: u8 = 0;
4411const INDEX_KEY_TAG_TEXT: u8 = 1;
4412const INDEX_KEY_TAG_BOOL: u8 = 2;
4413
4414impl Catalog {
4415 pub fn serialize(&self) -> Vec<u8> {
4418 let mut out = Vec::with_capacity(64);
4419 out.extend_from_slice(FILE_MAGIC);
4420 out.push(FILE_VERSION);
4421 write_u32(
4422 &mut out,
4423 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4424 );
4425 for t in &self.tables {
4426 write_str(&mut out, &t.schema.name);
4427 write_u16(
4428 &mut out,
4429 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4430 );
4431 for c in &t.schema.columns {
4432 write_str(&mut out, &c.name);
4433 write_data_type(&mut out, c.ty);
4434 out.push(u8::from(c.nullable));
4435 match &c.default {
4436 None => out.push(0),
4437 Some(v) => {
4438 out.push(1);
4439 write_value(&mut out, v);
4440 }
4441 }
4442 out.push(u8::from(c.auto_increment));
4443 }
4444 write_u32(
4445 &mut out,
4446 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4447 );
4448 for row in &t.rows {
4453 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4454 }
4455 write_u16(
4462 &mut out,
4463 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4464 );
4465 for idx in &t.indices {
4466 write_str(&mut out, &idx.name);
4467 write_u16(
4468 &mut out,
4469 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4470 );
4471 match &idx.kind {
4472 IndexKind::BTree(map) => {
4473 out.push(0);
4474 write_u32(
4482 &mut out,
4483 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4484 );
4485 for (key, locators) in map {
4486 write_index_key(&mut out, key);
4487 write_u32(
4488 &mut out,
4489 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4490 );
4491 for loc in locators {
4492 loc.write_le(&mut out);
4493 }
4494 }
4495 }
4496 IndexKind::Nsw(g) => {
4497 out.push(1);
4498 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4499 write_nsw_graph(&mut out, g);
4500 }
4501 IndexKind::Brin { column_type } => {
4502 out.push(2);
4508 write_data_type(&mut out, *column_type);
4509 }
4510 IndexKind::Gin(map) => {
4511 out.push(3);
4520 write_u32(
4521 &mut out,
4522 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4523 );
4524 for (word, locators) in map {
4525 write_str(&mut out, word);
4526 write_u32(
4527 &mut out,
4528 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4529 );
4530 for loc in locators {
4531 loc.write_le(&mut out);
4532 }
4533 }
4534 }
4535 }
4536 write_u16(
4542 &mut out,
4543 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4544 );
4545 for col_pos in &idx.included_columns {
4546 write_u16(
4547 &mut out,
4548 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4549 );
4550 }
4551 match &idx.partial_predicate {
4555 None => out.push(0),
4556 Some(pred) => {
4557 out.push(1);
4558 write_str(&mut out, pred);
4559 }
4560 }
4561 match &idx.expression {
4564 None => out.push(0),
4565 Some(expr) => {
4566 out.push(1);
4567 write_str(&mut out, expr);
4568 }
4569 }
4570 out.push(u8::from(idx.is_unique));
4574 write_u16(
4577 &mut out,
4578 u16::try_from(idx.extra_column_positions.len())
4579 .expect("≤ 65k extra cols / index"),
4580 );
4581 for cp in &idx.extra_column_positions {
4582 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4583 }
4584 }
4585 match t.schema.hot_tier_bytes {
4591 None => out.push(0),
4592 Some(n) => {
4593 out.push(1);
4594 out.extend_from_slice(&n.to_le_bytes());
4595 }
4596 }
4597 write_u16(
4608 &mut out,
4609 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4610 );
4611 for fk in &t.schema.foreign_keys {
4612 match &fk.name {
4613 None => out.push(0),
4614 Some(n) => {
4615 out.push(1);
4616 write_str(&mut out, n);
4617 }
4618 }
4619 write_u16(
4620 &mut out,
4621 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4622 );
4623 for &p in &fk.local_columns {
4624 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4625 }
4626 write_str(&mut out, &fk.parent_table);
4627 write_u16(
4628 &mut out,
4629 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4630 );
4631 for &p in &fk.parent_columns {
4632 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4633 }
4634 out.push(fk.on_delete.tag());
4635 out.push(fk.on_update.tag());
4636 }
4637 write_u16(
4646 &mut out,
4647 u16::try_from(t.schema.uniqueness_constraints.len())
4648 .expect("≤ 65k uniqueness constraints/table"),
4649 );
4650 for uc in &t.schema.uniqueness_constraints {
4651 out.push(u8::from(uc.is_primary_key));
4652 write_u16(
4653 &mut out,
4654 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4655 );
4656 for &p in &uc.columns {
4657 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4658 }
4659 }
4660 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4667 for (i, c) in t.schema.columns.iter().enumerate() {
4668 if let Some(e) = &c.runtime_default {
4669 rt_defaults.push((i, e.as_str()));
4670 }
4671 }
4672 write_u16(
4673 &mut out,
4674 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4675 );
4676 for (pos, expr) in rt_defaults {
4677 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4678 write_str(&mut out, expr);
4679 }
4680 }
4681 write_u32(
4694 &mut out,
4695 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
4696 );
4697 for fd in self.functions.values() {
4698 write_str(&mut out, &fd.name);
4699 write_str(&mut out, &fd.args_repr);
4700 write_str(&mut out, &fd.returns);
4701 write_str(&mut out, &fd.language);
4702 write_str_long(&mut out, &fd.body);
4703 }
4704 write_u32(
4705 &mut out,
4706 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
4707 );
4708 for td in &self.triggers {
4709 write_str(&mut out, &td.name);
4710 write_str(&mut out, &td.table);
4711 write_str(&mut out, &td.timing);
4712 write_u16(
4713 &mut out,
4714 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
4715 );
4716 for ev in &td.events {
4717 write_str(&mut out, ev);
4718 }
4719 write_str(&mut out, &td.for_each);
4720 write_str(&mut out, &td.function);
4721 }
4722 out
4723 }
4724
4725 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4728 let mut cur = Cursor::new(buf);
4729 let magic = cur.take(8)?;
4730 if magic != FILE_MAGIC {
4731 return Err(StorageError::Corrupt(format!(
4732 "bad magic: expected SPGDB001, got {magic:?}"
4733 )));
4734 }
4735 let version = cur.read_u8()?;
4736 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4737 return Err(StorageError::Corrupt(format!(
4738 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4739 )));
4740 }
4741 let table_count = cur.read_u32()? as usize;
4742 let mut cat = Self::new();
4743 for _ in 0..table_count {
4744 deserialize_table(&mut cur, &mut cat, version)?;
4745 }
4746 if version >= 22 {
4750 let fn_count = cur.read_u32()? as usize;
4751 for _ in 0..fn_count {
4752 let name = cur.read_str()?;
4753 let args_repr = cur.read_str()?;
4754 let returns = cur.read_str()?;
4755 let language = cur.read_str()?;
4756 let body = cur.read_str_long()?;
4757 cat.functions.insert(
4758 name.clone(),
4759 FunctionDef {
4760 name,
4761 args_repr,
4762 returns,
4763 language,
4764 body,
4765 },
4766 );
4767 }
4768 let trg_count = cur.read_u32()? as usize;
4769 for _ in 0..trg_count {
4770 let name = cur.read_str()?;
4771 let table = cur.read_str()?;
4772 let timing = cur.read_str()?;
4773 let ev_count = cur.read_u16()? as usize;
4774 let mut events = Vec::with_capacity(ev_count);
4775 for _ in 0..ev_count {
4776 events.push(cur.read_str()?);
4777 }
4778 let for_each = cur.read_str()?;
4779 let function = cur.read_str()?;
4780 cat.triggers.push(TriggerDef {
4781 name,
4782 table,
4783 timing,
4784 events,
4785 for_each,
4786 function,
4787 });
4788 }
4789 }
4790 if cur.pos < buf.len() {
4791 return Err(StorageError::Corrupt(format!(
4792 "trailing bytes: {} unread",
4793 buf.len() - cur.pos
4794 )));
4795 }
4796 Ok(cat)
4797 }
4798}
4799
4800fn deserialize_table(
4805 cur: &mut Cursor<'_>,
4806 cat: &mut Catalog,
4807 version: u8,
4808) -> Result<(), StorageError> {
4809 let table_name = cur.read_str()?;
4810 let name = table_name.clone();
4811 let col_count = cur.read_u16()? as usize;
4812 let mut cols = Vec::with_capacity(col_count);
4813 for _ in 0..col_count {
4814 let c_name = cur.read_str()?;
4815 let ty = cur.read_data_type()?;
4816 let nullable = cur.read_u8()? != 0;
4817 let default = match cur.read_u8()? {
4818 0 => None,
4819 1 => Some(cur.read_value()?),
4820 other => {
4821 return Err(StorageError::Corrupt(format!(
4822 "unknown default tag: {other}"
4823 )));
4824 }
4825 };
4826 let auto_increment = cur.read_u8()? != 0;
4827 cols.push(ColumnSchema {
4831 name: c_name,
4832 ty,
4833 nullable,
4834 default,
4835 runtime_default: None,
4836 auto_increment,
4837 });
4838 }
4839 let n_cols = cols.len();
4840 cat.create_table(TableSchema::new(name, cols))?;
4841 let t = cat.tables.last_mut().expect("create_table just pushed");
4845 deserialize_rows(cur, t, n_cols)?;
4846 deserialize_indices(cur, t, version)?;
4847 if version >= 11 {
4853 let has = cur.read_u8()?;
4854 let hot_tier_bytes = match has {
4855 0 => None,
4856 1 => Some(cur.read_u64()?),
4857 other => {
4858 return Err(StorageError::Corrupt(format!(
4859 "hot_tier_bytes appendix: unknown has-value byte {other}"
4860 )));
4861 }
4862 };
4863 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4864 }
4865 if version >= 13 {
4868 let fk_count = cur.read_u16()? as usize;
4869 let mut fks = Vec::with_capacity(fk_count);
4870 for _ in 0..fk_count {
4871 let name = match cur.read_u8()? {
4872 0 => None,
4873 1 => Some(cur.read_str()?),
4874 other => {
4875 return Err(StorageError::Corrupt(format!(
4876 "FK appendix: unknown has-name byte {other}"
4877 )));
4878 }
4879 };
4880 let local_arity = cur.read_u16()? as usize;
4881 let mut local_columns = Vec::with_capacity(local_arity);
4882 for _ in 0..local_arity {
4883 local_columns.push(cur.read_u16()? as usize);
4884 }
4885 let parent_table = cur.read_str()?;
4886 let parent_arity = cur.read_u16()? as usize;
4887 if parent_arity != local_arity {
4888 return Err(StorageError::Corrupt(format!(
4889 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4890 )));
4891 }
4892 let mut parent_columns = Vec::with_capacity(parent_arity);
4893 for _ in 0..parent_arity {
4894 parent_columns.push(cur.read_u16()? as usize);
4895 }
4896 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4897 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4898 })?;
4899 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4900 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4901 })?;
4902 fks.push(ForeignKeyConstraint {
4903 name,
4904 local_columns,
4905 parent_table,
4906 parent_columns,
4907 on_delete,
4908 on_update,
4909 });
4910 }
4911 t.schema_mut().foreign_keys = fks;
4912 }
4913 if version >= 15 {
4916 let uc_count = cur.read_u16()? as usize;
4917 let mut ucs = Vec::with_capacity(uc_count);
4918 for _ in 0..uc_count {
4919 let is_pk = cur.read_u8()? != 0;
4920 let arity = cur.read_u16()? as usize;
4921 let mut cols = Vec::with_capacity(arity);
4922 for _ in 0..arity {
4923 cols.push(cur.read_u16()? as usize);
4924 }
4925 ucs.push(UniquenessConstraint {
4926 is_primary_key: is_pk,
4927 columns: cols,
4928 });
4929 }
4930 t.schema_mut().uniqueness_constraints = ucs;
4931 let rt_count = cur.read_u16()? as usize;
4933 for _ in 0..rt_count {
4934 let pos = cur.read_u16()? as usize;
4935 let expr = cur.read_str()?;
4936 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4937 col.runtime_default = Some(expr);
4938 }
4939 }
4940 }
4941 let _ = table_name;
4942 Ok(())
4943}
4944
4945fn deserialize_rows(
4946 cur: &mut Cursor<'_>,
4947 t: &mut Table,
4948 _n_cols: usize,
4949) -> Result<(), StorageError> {
4950 let row_count = cur.read_u32()? as usize;
4951 let mut hot_bytes: u64 = 0;
4956 for _ in 0..row_count {
4957 let tail = &cur.buf[cur.pos..];
4958 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4959 cur.pos += consumed;
4960 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4966 t.rows.push_mut(row);
4967 }
4968 t.hot_bytes = hot_bytes;
4969 Ok(())
4970}
4971
4972fn deserialize_indices(
4973 cur: &mut Cursor<'_>,
4974 t: &mut Table,
4975 version: u8,
4976) -> Result<(), StorageError> {
4977 let index_count = cur.read_u16()? as usize;
4978 for _ in 0..index_count {
4979 let idx_name = cur.read_str()?;
4980 let col_pos = cur.read_u16()? as usize;
4981 let column_name = t
4982 .schema
4983 .columns
4984 .get(col_pos)
4985 .ok_or_else(|| {
4986 StorageError::Corrupt(format!(
4987 "index {idx_name:?} points at non-existent column position {col_pos}"
4988 ))
4989 })?
4990 .name
4991 .clone();
4992 let kind_tag = cur.read_u8()?;
4993 match kind_tag {
4994 0 => {
4995 if version >= 9 {
4996 let map = read_btree_map(cur)?;
5001 t.restore_btree_index(idx_name, &column_name, map)?;
5002 } else {
5003 t.add_index(idx_name, &column_name)?;
5008 }
5009 }
5010 1 => {
5011 let m = cur.read_u16()? as usize;
5012 let graph = cur.read_nsw_graph(m)?;
5013 t.restore_nsw_index(idx_name, &column_name, graph)?;
5014 }
5015 2 => {
5016 let column_type = cur.read_data_type()?;
5020 t.restore_brin_index(idx_name, &column_name, column_type)?;
5021 }
5022 3 => {
5023 let map = read_gin_map(cur)?;
5028 t.restore_gin_index(idx_name, &column_name, map)?;
5029 }
5030 other => {
5031 return Err(StorageError::Corrupt(format!(
5032 "unknown index kind tag: {other}"
5033 )));
5034 }
5035 }
5036 if version >= 12 {
5039 let num_included = cur.read_u16()? as usize;
5040 if num_included > 0 {
5041 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5042 for _ in 0..num_included {
5043 let cp = cur.read_u16()? as usize;
5044 if cp >= t.schema.columns.len() {
5045 return Err(StorageError::Corrupt(format!(
5046 "INCLUDE column position {cp} out of range \
5047 ({} schema columns)",
5048 t.schema.columns.len()
5049 )));
5050 }
5051 included.push(cp);
5052 }
5053 if let Some(last) = t.indices.last_mut() {
5054 last.included_columns = included;
5055 }
5056 }
5057 match cur.read_u8()? {
5059 0 => {}
5060 1 => {
5061 let pred = cur.read_str()?;
5062 if let Some(last) = t.indices.last_mut() {
5063 last.partial_predicate = Some(pred);
5064 }
5065 }
5066 other => {
5067 return Err(StorageError::Corrupt(format!(
5068 "partial_predicate tag: unknown byte {other}"
5069 )));
5070 }
5071 }
5072 match cur.read_u8()? {
5074 0 => {}
5075 1 => {
5076 let expr = cur.read_str()?;
5077 if let Some(last) = t.indices.last_mut() {
5078 last.expression = Some(expr);
5079 }
5080 }
5081 other => {
5082 return Err(StorageError::Corrupt(format!(
5083 "expression tag: unknown byte {other}"
5084 )));
5085 }
5086 }
5087 if version >= 16 {
5090 match cur.read_u8()? {
5091 0 => {}
5092 1 => {
5093 if let Some(last) = t.indices.last_mut() {
5094 last.is_unique = true;
5095 }
5096 }
5097 other => {
5098 return Err(StorageError::Corrupt(format!(
5099 "is_unique tag: unknown byte {other}"
5100 )));
5101 }
5102 }
5103 let n = cur.read_u16()? as usize;
5105 if n > 0 {
5106 let mut extras: Vec<usize> = Vec::with_capacity(n);
5107 for _ in 0..n {
5108 let cp = cur.read_u16()? as usize;
5109 if cp >= t.schema.columns.len() {
5110 return Err(StorageError::Corrupt(format!(
5111 "extra column position {cp} out of range \
5112 ({} schema columns)",
5113 t.schema.columns.len()
5114 )));
5115 }
5116 extras.push(cp);
5117 }
5118 if let Some(last) = t.indices.last_mut() {
5119 last.extra_column_positions = extras;
5120 }
5121 }
5122 }
5123 }
5124 }
5125 Ok(())
5126}
5127
5128fn read_btree_map(
5132 cur: &mut Cursor<'_>,
5133) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5134 let entry_count = cur.read_u32()? as usize;
5135 let mut map = PersistentBTreeMap::new();
5136 for _ in 0..entry_count {
5137 let key = cur.read_index_key()?;
5138 let locator_count = cur.read_u32()? as usize;
5139 let mut locators = Vec::with_capacity(locator_count);
5140 for _ in 0..locator_count {
5141 let tail = &cur.buf[cur.pos..];
5142 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5143 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5144 })?;
5145 cur.pos += consumed;
5146 locators.push(loc);
5147 }
5148 map.insert_mut(key, locators);
5149 }
5150 Ok(map)
5151}
5152
5153fn read_gin_map(
5157 cur: &mut Cursor<'_>,
5158) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5159 let entry_count = cur.read_u32()? as usize;
5160 let mut map = PersistentBTreeMap::new();
5161 for _ in 0..entry_count {
5162 let word = cur.read_str()?;
5163 let locator_count = cur.read_u32()? as usize;
5164 let mut locators = Vec::with_capacity(locator_count);
5165 for _ in 0..locator_count {
5166 let tail = &cur.buf[cur.pos..];
5167 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5168 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5169 })?;
5170 cur.pos += consumed;
5171 locators.push(loc);
5172 }
5173 map.insert_mut(word, locators);
5174 }
5175 Ok(map)
5176}
5177
5178fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5194 let entry = g.entry.map_or(u32::MAX, |e| {
5195 u32::try_from(e).expect("NSW entry fits in u32")
5196 });
5197 write_u16(
5198 out,
5199 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5200 );
5201 out.extend_from_slice(&entry.to_le_bytes());
5202 out.push(g.entry_level);
5203 let node_count = g.levels.len();
5204 write_u32(
5205 out,
5206 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5207 );
5208 for &lvl in &g.levels {
5209 out.push(lvl);
5210 }
5211 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5212 out.push(layer_count);
5213 for layer in &g.layers {
5214 write_u32(
5215 out,
5216 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5217 );
5218 for neighbors in layer {
5219 write_u16(
5220 out,
5221 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5222 );
5223 for &peer in neighbors {
5227 write_u32(out, peer);
5228 }
5229 }
5230 }
5231}
5232
5233fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5234 match t {
5235 DataType::Int => out.push(1),
5236 DataType::BigInt => out.push(2),
5237 DataType::Float => out.push(3),
5238 DataType::Text => out.push(4),
5239 DataType::Bool => out.push(5),
5240 DataType::Vector { dim, encoding } => match encoding {
5241 VecEncoding::F32 => {
5245 out.push(6);
5246 out.extend_from_slice(&dim.to_le_bytes());
5247 }
5248 VecEncoding::F16 => {
5251 out.push(15);
5252 out.extend_from_slice(&dim.to_le_bytes());
5253 }
5254 VecEncoding::Sq8 => {
5260 out.push(14);
5261 out.extend_from_slice(&dim.to_le_bytes());
5262 }
5263 },
5264 DataType::SmallInt => out.push(7),
5265 DataType::Varchar(max) => {
5266 out.push(8);
5267 out.extend_from_slice(&max.to_le_bytes());
5268 }
5269 DataType::Char(size) => {
5270 out.push(9);
5271 out.extend_from_slice(&size.to_le_bytes());
5272 }
5273 DataType::Numeric { precision, scale } => {
5274 out.push(10);
5275 out.push(precision);
5276 out.push(scale);
5277 }
5278 DataType::Date => out.push(11),
5279 DataType::Timestamp => out.push(12),
5280 DataType::Timestamptz => out.push(17),
5284 DataType::Interval => {
5289 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5290 }
5291 DataType::Json => out.push(13),
5292 DataType::Jsonb => out.push(16),
5295 DataType::Bytes => out.push(18),
5297 DataType::TextArray => out.push(19),
5300 DataType::IntArray => out.push(20),
5303 DataType::BigIntArray => out.push(21),
5306 DataType::TsVector => out.push(22),
5309 DataType::TsQuery => out.push(23),
5312 }
5313}
5314
5315impl Cursor<'_> {
5316 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5317 let tag = self.read_u8()?;
5318 match tag {
5319 1 => Ok(DataType::Int),
5320 2 => Ok(DataType::BigInt),
5321 3 => Ok(DataType::Float),
5322 4 => Ok(DataType::Text),
5323 5 => Ok(DataType::Bool),
5324 6 => Ok(DataType::Vector {
5325 dim: self.read_u32()?,
5326 encoding: VecEncoding::F32,
5327 }),
5328 7 => Ok(DataType::SmallInt),
5329 8 => Ok(DataType::Varchar(self.read_u32()?)),
5330 9 => Ok(DataType::Char(self.read_u32()?)),
5331 10 => {
5332 let precision = self.read_u8()?;
5333 let scale = self.read_u8()?;
5334 Ok(DataType::Numeric { precision, scale })
5335 }
5336 11 => Ok(DataType::Date),
5337 12 => Ok(DataType::Timestamp),
5338 13 => Ok(DataType::Json),
5339 14 => Ok(DataType::Vector {
5340 dim: self.read_u32()?,
5341 encoding: VecEncoding::Sq8,
5342 }),
5343 15 => Ok(DataType::Vector {
5347 dim: self.read_u32()?,
5348 encoding: VecEncoding::F16,
5349 }),
5350 16 => Ok(DataType::Jsonb),
5354 17 => Ok(DataType::Timestamptz),
5358 18 => Ok(DataType::Bytes),
5360 19 => Ok(DataType::TextArray),
5362 20 => Ok(DataType::IntArray),
5364 21 => Ok(DataType::BigIntArray),
5365 22 => Ok(DataType::TsVector),
5368 23 => Ok(DataType::TsQuery),
5369 other => Err(StorageError::Corrupt(format!(
5370 "unknown data type tag: {other}"
5371 ))),
5372 }
5373 }
5374}
5375
5376pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5382 debug_assert_eq!(
5383 row.values.len(),
5384 schema.columns.len(),
5385 "row_body_encoded_len: row arity must match schema"
5386 );
5387 let bitmap_bytes = schema.columns.len().div_ceil(8);
5388 let mut n = bitmap_bytes;
5389 for (col_idx, v) in row.values.iter().enumerate() {
5390 if matches!(v, Value::Null) {
5391 continue;
5392 }
5393 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5394 }
5395 n
5396}
5397
5398fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5404 match v {
5405 Value::SmallInt(_) => 2,
5406 Value::Int(_) | Value::Date(_) => 4,
5408 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5410 Value::Bool(_) => 1,
5411 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5413 Value::Vector(vec) => 4 + 4 * vec.len(),
5415 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5422 Value::HalfVector(h) => 4 + h.bytes.len(),
5425 Value::Numeric { .. } => 16 + 1,
5427 Value::Bytes(b) => 2 + b.len(),
5433 Value::TextArray(items) => {
5436 let mut n = 2; for item in items {
5438 n += 1; if let Some(s) = item {
5440 n += 2 + s.len();
5441 }
5442 }
5443 n
5444 }
5445 Value::IntArray(items) => {
5448 2 + items
5449 .iter()
5450 .map(|x| if x.is_some() { 5 } else { 1 })
5451 .sum::<usize>()
5452 }
5453 Value::BigIntArray(items) => {
5454 2 + items
5455 .iter()
5456 .map(|x| if x.is_some() { 9 } else { 1 })
5457 .sum::<usize>()
5458 }
5459 Value::TsVector(lexs) => {
5463 let mut n = 2;
5464 for l in lexs {
5465 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5466 }
5467 n
5468 }
5469 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5472 Value::Null => 0,
5474 Value::Interval { .. } => {
5476 unreachable!("Value::Interval has no on-disk encoding")
5477 }
5478 }
5479}
5480
5481pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5492 debug_assert_eq!(
5493 row.values.len(),
5494 schema.columns.len(),
5495 "dense encode: row arity must match schema"
5496 );
5497 let bitmap_bytes = schema.columns.len().div_ceil(8);
5498 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5501 let bitmap_offset = out.len();
5502 out.resize(bitmap_offset + bitmap_bytes, 0);
5503 for (i, v) in row.values.iter().enumerate() {
5504 if matches!(v, Value::Null) {
5505 out[bitmap_offset + i / 8] |= 1 << (i % 8);
5506 }
5507 }
5508 for (col_idx, v) in row.values.iter().enumerate() {
5509 if matches!(v, Value::Null) {
5510 continue;
5511 }
5512 write_value_body(&mut out, v, schema.columns[col_idx].ty);
5513 }
5514 out
5515}
5516
5517pub fn decode_row_body_dense(
5523 bytes: &[u8],
5524 schema: &TableSchema,
5525) -> Result<(Row, usize), StorageError> {
5526 let mut cur = Cursor::new(bytes);
5527 let bitmap_bytes = schema.columns.len().div_ceil(8);
5528 let mut bitmap_buf = [0u8; 32];
5529 if bitmap_bytes > bitmap_buf.len() {
5530 return Err(StorageError::Corrupt(format!(
5531 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
5532 )));
5533 }
5534 let slice = cur.take(bitmap_bytes)?;
5535 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
5536 let mut values = Vec::with_capacity(schema.columns.len());
5537 for (col_idx, col) in schema.columns.iter().enumerate() {
5538 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
5539 values.push(Value::Null);
5540 } else {
5541 values.push(cur.read_value_body(col.ty)?);
5542 }
5543 }
5544 Ok((Row { values }, cur.pos))
5545}
5546
5547fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
5556 match (v, ty) {
5557 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
5558 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
5559 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
5560 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
5561 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
5562 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
5563 write_str(out, s);
5564 }
5565 (
5566 Value::Vector(v),
5567 DataType::Vector {
5568 encoding: VecEncoding::F32,
5569 ..
5570 },
5571 ) => {
5572 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5573 out.extend_from_slice(&dim.to_le_bytes());
5574 for x in v {
5575 out.extend_from_slice(&x.to_le_bytes());
5576 }
5577 }
5578 (
5584 Value::Sq8Vector(q),
5585 DataType::Vector {
5586 encoding: VecEncoding::Sq8,
5587 ..
5588 },
5589 ) => {
5590 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5591 out.extend_from_slice(&dim.to_le_bytes());
5592 out.extend_from_slice(&q.min.to_le_bytes());
5593 out.extend_from_slice(&q.max.to_le_bytes());
5594 out.extend_from_slice(&q.bytes);
5595 }
5596 (
5600 Value::HalfVector(h),
5601 DataType::Vector {
5602 encoding: VecEncoding::F16,
5603 ..
5604 },
5605 ) => {
5606 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5607 out.extend_from_slice(&dim.to_le_bytes());
5608 out.extend_from_slice(&h.bytes);
5609 }
5610 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
5611 out.extend_from_slice(&scaled.to_le_bytes());
5612 out.push(scale);
5613 }
5614 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
5615 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
5616 out.extend_from_slice(&t.to_le_bytes())
5617 }
5618 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5622 (Value::Bytes(b), DataType::Bytes) => {
5625 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5626 out.extend_from_slice(&len.to_le_bytes());
5627 out.extend_from_slice(b);
5628 }
5629 (Value::TextArray(items), DataType::TextArray) => {
5632 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5633 out.extend_from_slice(&count.to_le_bytes());
5634 for item in items {
5635 match item {
5636 None => out.push(1),
5637 Some(s) => {
5638 out.push(0);
5639 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5640 out.extend_from_slice(&len.to_le_bytes());
5641 out.extend_from_slice(s.as_bytes());
5642 }
5643 }
5644 }
5645 }
5646 (Value::IntArray(items), DataType::IntArray) => {
5649 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5650 out.extend_from_slice(&count.to_le_bytes());
5651 for item in items {
5652 match item {
5653 None => out.push(1),
5654 Some(n) => {
5655 out.push(0);
5656 out.extend_from_slice(&n.to_le_bytes());
5657 }
5658 }
5659 }
5660 }
5661 (Value::BigIntArray(items), DataType::BigIntArray) => {
5664 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5665 out.extend_from_slice(&count.to_le_bytes());
5666 for item in items {
5667 match item {
5668 None => out.push(1),
5669 Some(n) => {
5670 out.push(0);
5671 out.extend_from_slice(&n.to_le_bytes());
5672 }
5673 }
5674 }
5675 }
5676 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
5679 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
5681 (other, ty) => unreachable!(
5685 "schema-driven encode received mismatched value/type pair: \
5686 value tag={:?}, column type={:?}",
5687 other.data_type(),
5688 ty
5689 ),
5690 }
5691}
5692
5693fn write_value(out: &mut Vec<u8>, v: &Value) {
5694 match v {
5695 Value::Null => out.push(0),
5696 Value::SmallInt(n) => {
5697 out.push(7);
5698 out.extend_from_slice(&n.to_le_bytes());
5699 }
5700 Value::Int(n) => {
5701 out.push(1);
5702 out.extend_from_slice(&n.to_le_bytes());
5703 }
5704 Value::BigInt(n) => {
5705 out.push(2);
5706 out.extend_from_slice(&n.to_le_bytes());
5707 }
5708 Value::Float(x) => {
5709 out.push(3);
5710 out.extend_from_slice(&x.to_le_bytes());
5711 }
5712 Value::Text(s) | Value::Json(s) => {
5717 out.push(4);
5718 write_str(out, s);
5719 }
5720 Value::Bool(b) => {
5721 out.push(5);
5722 out.push(u8::from(*b));
5723 }
5724 Value::Vector(v) => {
5725 out.push(6);
5726 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5727 out.extend_from_slice(&dim.to_le_bytes());
5728 for x in v {
5729 out.extend_from_slice(&x.to_le_bytes());
5730 }
5731 }
5732 Value::Sq8Vector(q) => {
5737 out.push(11);
5738 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5739 out.extend_from_slice(&dim.to_le_bytes());
5740 out.extend_from_slice(&q.min.to_le_bytes());
5741 out.extend_from_slice(&q.max.to_le_bytes());
5742 out.extend_from_slice(&q.bytes);
5743 }
5744 Value::HalfVector(h) => {
5749 out.push(12);
5750 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5751 out.extend_from_slice(&dim.to_le_bytes());
5752 out.extend_from_slice(&h.bytes);
5753 }
5754 Value::Numeric { scaled, scale } => {
5755 out.push(8);
5756 out.extend_from_slice(&scaled.to_le_bytes());
5757 out.push(*scale);
5758 }
5759 Value::Date(d) => {
5760 out.push(9);
5761 out.extend_from_slice(&d.to_le_bytes());
5762 }
5763 Value::Timestamp(t) => {
5764 out.push(10);
5765 out.extend_from_slice(&t.to_le_bytes());
5766 }
5767 Value::Interval { .. } => {
5771 unreachable!(
5772 "Value::Interval has no on-disk encoding; engine must reject it before write"
5773 )
5774 }
5775 Value::Bytes(b) => {
5780 out.push(14);
5781 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5782 out.extend_from_slice(&len.to_le_bytes());
5783 out.extend_from_slice(b);
5784 }
5785 Value::TextArray(items) => {
5788 out.push(15);
5789 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5790 out.extend_from_slice(&count.to_le_bytes());
5791 for item in items {
5792 match item {
5793 None => out.push(1),
5794 Some(s) => {
5795 out.push(0);
5796 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5797 out.extend_from_slice(&len.to_le_bytes());
5798 out.extend_from_slice(s.as_bytes());
5799 }
5800 }
5801 }
5802 }
5803 Value::IntArray(items) => {
5806 out.push(16);
5807 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5808 out.extend_from_slice(&count.to_le_bytes());
5809 for item in items {
5810 match item {
5811 None => out.push(1),
5812 Some(n) => {
5813 out.push(0);
5814 out.extend_from_slice(&n.to_le_bytes());
5815 }
5816 }
5817 }
5818 }
5819 Value::BigIntArray(items) => {
5822 out.push(17);
5823 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5824 out.extend_from_slice(&count.to_le_bytes());
5825 for item in items {
5826 match item {
5827 None => out.push(1),
5828 Some(n) => {
5829 out.push(0);
5830 out.extend_from_slice(&n.to_le_bytes());
5831 }
5832 }
5833 }
5834 }
5835 Value::TsVector(lexs) => {
5838 out.push(18);
5839 write_tsvector_body(out, lexs);
5840 }
5841 Value::TsQuery(ast) => {
5844 out.push(19);
5845 write_tsquery_body(out, ast);
5846 }
5847 }
5848}
5849
5850fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
5853 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
5854 out.extend_from_slice(&count.to_le_bytes());
5855 for l in lexs {
5856 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
5857 out.extend_from_slice(&wlen.to_le_bytes());
5858 out.extend_from_slice(l.word.as_bytes());
5859 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
5860 out.extend_from_slice(&plen.to_le_bytes());
5861 for p in &l.positions {
5862 out.extend_from_slice(&p.to_le_bytes());
5863 }
5864 out.push(l.weight);
5865 }
5866}
5867
5868fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
5872 match ast {
5873 TsQueryAst::Term { word, weight_mask } => {
5874 out.push(0);
5875 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
5876 out.extend_from_slice(&len.to_le_bytes());
5877 out.extend_from_slice(word.as_bytes());
5878 out.push(*weight_mask);
5879 }
5880 TsQueryAst::And(a, b) => {
5881 out.push(1);
5882 write_tsquery_body(out, a);
5883 write_tsquery_body(out, b);
5884 }
5885 TsQueryAst::Or(a, b) => {
5886 out.push(2);
5887 write_tsquery_body(out, a);
5888 write_tsquery_body(out, b);
5889 }
5890 TsQueryAst::Not(x) => {
5891 out.push(3);
5892 write_tsquery_body(out, x);
5893 }
5894 TsQueryAst::Phrase {
5895 left,
5896 right,
5897 distance,
5898 } => {
5899 out.push(4);
5900 out.extend_from_slice(&distance.to_le_bytes());
5901 write_tsquery_body(out, left);
5902 write_tsquery_body(out, right);
5903 }
5904 }
5905}
5906
5907fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
5909 match ast {
5910 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
5911 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
5912 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
5913 }
5914 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
5915 TsQueryAst::Phrase { left, right, .. } => {
5916 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
5917 }
5918 }
5919}
5920
5921fn write_u16(out: &mut Vec<u8>, n: u16) {
5922 out.extend_from_slice(&n.to_le_bytes());
5923}
5924fn write_u32(out: &mut Vec<u8>, n: u32) {
5925 out.extend_from_slice(&n.to_le_bytes());
5926}
5927fn write_str(out: &mut Vec<u8>, s: &str) {
5928 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
5929 write_u16(out, len);
5930 out.extend_from_slice(s.as_bytes());
5931}
5932
5933fn write_str_long(out: &mut Vec<u8>, s: &str) {
5938 let len = u32::try_from(s.len()).expect("function body fits in u32");
5939 write_u32(out, len);
5940 out.extend_from_slice(s.as_bytes());
5941}
5942
5943fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
5947 match key {
5948 IndexKey::Int(n) => {
5949 out.push(INDEX_KEY_TAG_INT);
5950 out.extend_from_slice(&n.to_le_bytes());
5951 }
5952 IndexKey::Text(s) => {
5953 out.push(INDEX_KEY_TAG_TEXT);
5954 write_str(out, s);
5955 }
5956 IndexKey::Bool(b) => {
5957 out.push(INDEX_KEY_TAG_BOOL);
5958 out.push(u8::from(*b));
5959 }
5960 }
5961}
5962
5963struct Cursor<'a> {
5964 buf: &'a [u8],
5965 pos: usize,
5966}
5967
5968impl<'a> Cursor<'a> {
5969 const fn new(buf: &'a [u8]) -> Self {
5970 Self { buf, pos: 0 }
5971 }
5972
5973 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5974 let end = self
5975 .pos
5976 .checked_add(n)
5977 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5978 if end > self.buf.len() {
5979 return Err(StorageError::Corrupt(format!(
5980 "unexpected EOF at offset {} (wanted {n} more bytes)",
5981 self.pos
5982 )));
5983 }
5984 let s = &self.buf[self.pos..end];
5985 self.pos = end;
5986 Ok(s)
5987 }
5988
5989 fn read_u8(&mut self) -> Result<u8, StorageError> {
5990 Ok(self.take(1)?[0])
5991 }
5992 fn read_u16(&mut self) -> Result<u16, StorageError> {
5993 let s = self.take(2)?;
5994 Ok(u16::from_le_bytes([s[0], s[1]]))
5995 }
5996 fn read_u32(&mut self) -> Result<u32, StorageError> {
5997 let s = self.take(4)?;
5998 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5999 }
6000 fn read_i32(&mut self) -> Result<i32, StorageError> {
6001 let s = self.take(4)?;
6002 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6003 }
6004 fn read_u64(&mut self) -> Result<u64, StorageError> {
6007 let s = self.take(8)?;
6008 Ok(u64::from_le_bytes([
6009 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6010 ]))
6011 }
6012 fn read_i64(&mut self) -> Result<i64, StorageError> {
6013 let s = self.take(8)?;
6014 let arr: [u8; 8] = s.try_into().expect("checked");
6015 Ok(i64::from_le_bytes(arr))
6016 }
6017 fn read_f64(&mut self) -> Result<f64, StorageError> {
6018 let s = self.take(8)?;
6019 let arr: [u8; 8] = s.try_into().expect("checked");
6020 Ok(f64::from_le_bytes(arr))
6021 }
6022 fn read_f32(&mut self) -> Result<f32, StorageError> {
6023 let s = self.take(4)?;
6024 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6025 }
6026 fn read_str(&mut self) -> Result<String, StorageError> {
6027 let len = self.read_u16()? as usize;
6028 let bytes = self.take(len)?;
6029 core::str::from_utf8(bytes)
6030 .map(String::from)
6031 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6032 }
6033
6034 fn read_str_long(&mut self) -> Result<String, StorageError> {
6038 let len = self.read_u32()? as usize;
6039 let bytes = self.take(len)?;
6040 core::str::from_utf8(bytes)
6041 .map(String::from)
6042 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6043 }
6044
6045 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6049 let tag = self.read_u8()?;
6050 match tag {
6051 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6052 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6053 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6054 other => Err(StorageError::Corrupt(format!(
6055 "unknown index key tag: {other}"
6056 ))),
6057 }
6058 }
6059 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6065 match ty {
6066 DataType::SmallInt => {
6067 let s = self.take(2)?;
6068 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6069 }
6070 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6071 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6072 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6073 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6074 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6075 Ok(Value::Text(self.read_str()?))
6076 }
6077 DataType::Vector {
6078 encoding: VecEncoding::F32,
6079 ..
6080 } => {
6081 let dim = self.read_u32()? as usize;
6082 let mut v = Vec::with_capacity(dim);
6083 for _ in 0..dim {
6084 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6085 v.push(f32::from_le_bytes(bytes));
6086 }
6087 Ok(Value::Vector(v))
6088 }
6089 DataType::Vector {
6090 encoding: VecEncoding::Sq8,
6091 ..
6092 } => {
6093 let dim = self.read_u32()? as usize;
6094 let min = self.read_f32()?;
6095 let max = self.read_f32()?;
6096 let bytes = self.take(dim)?.to_vec();
6097 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6098 }
6099 DataType::Vector {
6100 encoding: VecEncoding::F16,
6101 ..
6102 } => {
6103 let dim = self.read_u32()? as usize;
6104 let bytes = self.take(dim * 2)?.to_vec();
6105 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6106 }
6107 DataType::Numeric { .. } => {
6108 let s = self.take(16)?;
6109 let arr: [u8; 16] = s.try_into().expect("checked");
6110 let scaled = i128::from_le_bytes(arr);
6111 let scale = self.read_u8()?;
6112 Ok(Value::Numeric { scaled, scale })
6113 }
6114 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6115 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6116 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6117 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6118 DataType::Interval => {
6119 Err(StorageError::Corrupt(
6124 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6125 ))
6126 }
6127 DataType::Json => Ok(Value::Json(self.read_str()?)),
6128 DataType::Bytes => {
6131 let len = self.read_u16()? as usize;
6132 let bytes = self.take(len)?.to_vec();
6133 Ok(Value::Bytes(bytes))
6134 }
6135 DataType::TextArray => {
6137 let count = self.read_u16()? as usize;
6138 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6139 for _ in 0..count {
6140 match self.read_u8()? {
6141 0 => items.push(Some(self.read_str()?)),
6142 1 => items.push(None),
6143 other => {
6144 return Err(StorageError::Corrupt(format!(
6145 "TEXT[] null flag: unknown byte {other}"
6146 )));
6147 }
6148 }
6149 }
6150 Ok(Value::TextArray(items))
6151 }
6152 DataType::IntArray => {
6154 let count = self.read_u16()? as usize;
6155 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6156 for _ in 0..count {
6157 match self.read_u8()? {
6158 0 => items.push(Some(self.read_i32()?)),
6159 1 => items.push(None),
6160 other => {
6161 return Err(StorageError::Corrupt(format!(
6162 "INT[] null flag: unknown byte {other}"
6163 )));
6164 }
6165 }
6166 }
6167 Ok(Value::IntArray(items))
6168 }
6169 DataType::BigIntArray => {
6171 let count = self.read_u16()? as usize;
6172 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6173 for _ in 0..count {
6174 match self.read_u8()? {
6175 0 => items.push(Some(self.read_i64()?)),
6176 1 => items.push(None),
6177 other => {
6178 return Err(StorageError::Corrupt(format!(
6179 "BIGINT[] null flag: unknown byte {other}"
6180 )));
6181 }
6182 }
6183 }
6184 Ok(Value::BigIntArray(items))
6185 }
6186 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6190 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6191 }
6192 }
6193
6194 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6196 let count = self.read_u16()? as usize;
6197 let mut out = Vec::with_capacity(count);
6198 for _ in 0..count {
6199 let word = self.read_str()?;
6200 let pos_count = self.read_u16()? as usize;
6201 let mut positions = Vec::with_capacity(pos_count);
6202 for _ in 0..pos_count {
6203 positions.push(self.read_u16()?);
6204 }
6205 let weight = self.read_u8()?;
6206 out.push(TsLexeme {
6207 word,
6208 positions,
6209 weight,
6210 });
6211 }
6212 Ok(out)
6213 }
6214
6215 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6217 let tag = self.read_u8()?;
6218 match tag {
6219 0 => {
6220 let word = self.read_str()?;
6221 let weight_mask = self.read_u8()?;
6222 Ok(TsQueryAst::Term { word, weight_mask })
6223 }
6224 1 => {
6225 let a = self.read_tsquery_body()?;
6226 let b = self.read_tsquery_body()?;
6227 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6228 }
6229 2 => {
6230 let a = self.read_tsquery_body()?;
6231 let b = self.read_tsquery_body()?;
6232 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6233 }
6234 3 => {
6235 let x = self.read_tsquery_body()?;
6236 Ok(TsQueryAst::Not(Box::new(x)))
6237 }
6238 4 => {
6239 let distance = self.read_u16()?;
6240 let left = self.read_tsquery_body()?;
6241 let right = self.read_tsquery_body()?;
6242 Ok(TsQueryAst::Phrase {
6243 left: Box::new(left),
6244 right: Box::new(right),
6245 distance,
6246 })
6247 }
6248 other => Err(StorageError::Corrupt(format!(
6249 "tsquery: unknown node tag {other}"
6250 ))),
6251 }
6252 }
6253
6254 fn read_value(&mut self) -> Result<Value, StorageError> {
6255 let tag = self.read_u8()?;
6256 match tag {
6257 0 => Ok(Value::Null),
6258 1 => Ok(Value::Int(self.read_i32()?)),
6259 2 => Ok(Value::BigInt(self.read_i64()?)),
6260 3 => Ok(Value::Float(self.read_f64()?)),
6261 4 => Ok(Value::Text(self.read_str()?)),
6262 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6263 6 => {
6264 let dim = self.read_u32()? as usize;
6265 let mut v = Vec::with_capacity(dim);
6266 for _ in 0..dim {
6267 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6268 v.push(f32::from_le_bytes(bytes));
6269 }
6270 Ok(Value::Vector(v))
6271 }
6272 7 => {
6273 let s = self.take(2)?;
6274 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6275 }
6276 8 => {
6277 let s = self.take(16)?;
6278 let arr: [u8; 16] = s.try_into().expect("checked");
6279 let scaled = i128::from_le_bytes(arr);
6280 let scale = self.read_u8()?;
6281 Ok(Value::Numeric { scaled, scale })
6282 }
6283 9 => Ok(Value::Date(self.read_i32()?)),
6284 10 => Ok(Value::Timestamp(self.read_i64()?)),
6285 11 => {
6290 let dim = self.read_u32()? as usize;
6291 let min = self.read_f32()?;
6292 let max = self.read_f32()?;
6293 let bytes = self.take(dim)?.to_vec();
6294 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6295 }
6296 12 => {
6299 let dim = self.read_u32()? as usize;
6300 let bytes = self.take(dim * 2)?.to_vec();
6301 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6302 }
6303 14 => {
6305 let len = self.read_u16()? as usize;
6306 let bytes = self.take(len)?.to_vec();
6307 Ok(Value::Bytes(bytes))
6308 }
6309 15 => {
6312 let count = self.read_u16()? as usize;
6313 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6314 for _ in 0..count {
6315 match self.read_u8()? {
6316 0 => items.push(Some(self.read_str()?)),
6317 1 => items.push(None),
6318 other => {
6319 return Err(StorageError::Corrupt(format!(
6320 "TEXT[] null flag in value tag: unknown byte {other}"
6321 )));
6322 }
6323 }
6324 }
6325 Ok(Value::TextArray(items))
6326 }
6327 16 => {
6329 let count = self.read_u16()? as usize;
6330 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6331 for _ in 0..count {
6332 match self.read_u8()? {
6333 0 => items.push(Some(self.read_i32()?)),
6334 1 => items.push(None),
6335 other => {
6336 return Err(StorageError::Corrupt(format!(
6337 "INT[] null flag in value tag: unknown byte {other}"
6338 )));
6339 }
6340 }
6341 }
6342 Ok(Value::IntArray(items))
6343 }
6344 17 => {
6345 let count = self.read_u16()? as usize;
6346 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6347 for _ in 0..count {
6348 match self.read_u8()? {
6349 0 => items.push(Some(self.read_i64()?)),
6350 1 => items.push(None),
6351 other => {
6352 return Err(StorageError::Corrupt(format!(
6353 "BIGINT[] null flag in value tag: unknown byte {other}"
6354 )));
6355 }
6356 }
6357 }
6358 Ok(Value::BigIntArray(items))
6359 }
6360 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6363 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6365 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6366 }
6367 }
6368
6369 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6373 let m_max_0 = self.read_u16()? as usize;
6374 let entry_raw = self.read_u32()?;
6375 let entry = if entry_raw == u32::MAX {
6376 None
6377 } else {
6378 Some(entry_raw as usize)
6379 };
6380 let entry_level = self.read_u8()?;
6381 let node_count = self.read_u32()? as usize;
6382 let mut levels: PersistentVec<u8> = PersistentVec::new();
6387 for _ in 0..node_count {
6388 levels.push_mut(self.read_u8()?);
6389 }
6390 let layer_count = self.read_u8()? as usize;
6391 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6392 for _ in 0..layer_count {
6393 let n = self.read_u32()? as usize;
6394 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6395 for _ in 0..n {
6396 let cnt = self.read_u16()? as usize;
6397 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6398 for _ in 0..cnt {
6399 row.push(self.read_u32()?);
6400 }
6401 per_layer.push_mut(row);
6402 }
6403 layers.push(per_layer);
6404 }
6405 Ok(NswGraph {
6406 m,
6407 m_max_0,
6408 entry,
6409 entry_level,
6410 levels,
6411 layers,
6412 })
6413 }
6414}
6415
6416#[cfg(test)]
6417mod tests {
6418 use super::*;
6419 use alloc::string::ToString;
6420 use alloc::vec;
6421
6422 #[cfg(target_arch = "aarch64")]
6423 #[test]
6424 fn neon_l2_matches_scalar() {
6425 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6430 for &d in &dims {
6431 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6432 let mut a = Vec::with_capacity(d);
6433 let mut b = Vec::with_capacity(d);
6434 for _ in 0..d {
6435 state = state
6436 .wrapping_mul(6_364_136_223_846_793_005)
6437 .wrapping_add(1);
6438 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6439 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6440 state = state
6441 .wrapping_mul(6_364_136_223_846_793_005)
6442 .wrapping_add(1);
6443 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6444 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6445 a.push(x);
6446 b.push(y);
6447 }
6448 let scalar = l2_distance_sq_scalar(&a, &b);
6449 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6450 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6451 assert!(
6452 (scalar - neon).abs() <= tol,
6453 "dim={d}: scalar={scalar} neon={neon} diff={}",
6454 (scalar - neon).abs()
6455 );
6456 }
6457 }
6458
6459 #[cfg(target_arch = "aarch64")]
6460 #[test]
6461 fn neon_inner_product_matches_scalar() {
6462 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6466 for &d in &dims {
6467 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6468 let mut a = Vec::with_capacity(d);
6469 let mut b = Vec::with_capacity(d);
6470 for _ in 0..d {
6471 state = state
6472 .wrapping_mul(6_364_136_223_846_793_005)
6473 .wrapping_add(1);
6474 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6475 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6476 state = state
6477 .wrapping_mul(6_364_136_223_846_793_005)
6478 .wrapping_add(1);
6479 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6480 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6481 a.push(x);
6482 b.push(y);
6483 }
6484 let scalar = inner_product_scalar(&a, &b);
6485 let neon = unsafe { inner_product_neon(&a, &b) };
6486 #[allow(clippy::cast_precision_loss)]
6487 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6488 assert!(
6489 (scalar - neon).abs() <= tol,
6490 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6491 (scalar - neon).abs()
6492 );
6493 }
6494 }
6495
6496 #[cfg(target_arch = "aarch64")]
6497 #[allow(clippy::similar_names)]
6498 #[test]
6499 fn neon_cosine_dot_norms_matches_scalar() {
6500 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6501 for &d in &dims {
6502 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
6503 let mut a = Vec::with_capacity(d);
6504 let mut b = Vec::with_capacity(d);
6505 for _ in 0..d {
6506 state = state
6507 .wrapping_mul(6_364_136_223_846_793_005)
6508 .wrapping_add(1);
6509 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6510 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6511 state = state
6512 .wrapping_mul(6_364_136_223_846_793_005)
6513 .wrapping_add(1);
6514 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6515 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6516 a.push(x);
6517 b.push(y);
6518 }
6519 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
6520 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
6521 #[allow(clippy::cast_precision_loss)]
6522 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6523 #[allow(clippy::cast_precision_loss)]
6524 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6525 assert!(
6526 (dot_s - dot_n).abs() <= tol_d,
6527 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
6528 );
6529 assert!(
6530 (na_s - na_n).abs() <= tol_n,
6531 "cosine na dim={d}: scalar={na_s} neon={na_n}"
6532 );
6533 assert!(
6534 (nb_s - nb_n).abs() <= tol_n,
6535 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
6536 );
6537 }
6538 }
6539
6540 fn make_users_schema() -> TableSchema {
6541 TableSchema::new(
6542 "users",
6543 vec![
6544 ColumnSchema::new("id", DataType::Int, false),
6545 ColumnSchema::new("name", DataType::Text, false),
6546 ColumnSchema::new("score", DataType::Float, true),
6547 ],
6548 )
6549 }
6550
6551 #[test]
6552 fn value_type_tag_matches_variant() {
6553 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
6554 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
6555 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
6556 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
6557 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
6558 assert_eq!(Value::Null.data_type(), None);
6559 assert!(Value::Null.is_null());
6560 assert!(!Value::Int(0).is_null());
6561 }
6562
6563 #[test]
6564 fn sq8_value_reports_sq8_data_type() {
6565 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
6570 let v = Value::Sq8Vector(q);
6571 assert_eq!(
6572 v.data_type(),
6573 Some(DataType::Vector {
6574 dim: 5,
6575 encoding: VecEncoding::Sq8,
6576 }),
6577 );
6578 }
6579
6580 #[test]
6581 fn datatype_display_matches_pg_keyword() {
6582 assert_eq!(DataType::Int.to_string(), "INT");
6583 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
6584 assert_eq!(DataType::Float.to_string(), "FLOAT");
6585 assert_eq!(DataType::Text.to_string(), "TEXT");
6586 assert_eq!(DataType::Bool.to_string(), "BOOL");
6587 }
6588
6589 #[test]
6590 fn row_len_and_emptiness() {
6591 let r = Row::new(vec![Value::Int(1), Value::Null]);
6592 assert_eq!(r.len(), 2);
6593 assert!(!r.is_empty());
6594 assert!(Row::new(Vec::new()).is_empty());
6595 }
6596
6597 #[test]
6598 fn table_schema_column_position() {
6599 let s = make_users_schema();
6600 assert_eq!(s.column_position("id"), Some(0));
6601 assert_eq!(s.column_position("score"), Some(2));
6602 assert_eq!(s.column_position("missing"), None);
6603 }
6604
6605 #[test]
6606 fn catalog_create_table_then_lookup() {
6607 let mut cat = Catalog::new();
6608 cat.create_table(make_users_schema()).unwrap();
6609 assert_eq!(cat.table_count(), 1);
6610 assert!(cat.get("users").is_some());
6611 assert!(cat.get("nope").is_none());
6612 }
6613
6614 #[test]
6615 fn catalog_duplicate_table_is_rejected() {
6616 let mut cat = Catalog::new();
6617 cat.create_table(make_users_schema()).unwrap();
6618 let err = cat.create_table(make_users_schema()).unwrap_err();
6619 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
6620 }
6621
6622 #[test]
6623 fn table_insert_happy_path_appends_row() {
6624 let mut cat = Catalog::new();
6625 cat.create_table(make_users_schema()).unwrap();
6626 let t = cat.get_mut("users").unwrap();
6627 t.insert(Row::new(vec![
6628 Value::Int(1),
6629 Value::Text("alice".into()),
6630 Value::Float(99.5),
6631 ]))
6632 .unwrap();
6633 assert_eq!(t.row_count(), 1);
6634 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
6635 }
6636
6637 #[test]
6638 fn table_insert_arity_mismatch() {
6639 let mut cat = Catalog::new();
6640 cat.create_table(make_users_schema()).unwrap();
6641 let t = cat.get_mut("users").unwrap();
6642 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
6643 assert!(matches!(
6644 err,
6645 StorageError::ArityMismatch {
6646 expected: 3,
6647 actual: 1
6648 }
6649 ));
6650 assert_eq!(t.row_count(), 0);
6651 }
6652
6653 #[test]
6654 fn table_insert_type_mismatch_reports_column() {
6655 let mut cat = Catalog::new();
6656 cat.create_table(make_users_schema()).unwrap();
6657 let t = cat.get_mut("users").unwrap();
6658 let err = t
6659 .insert(Row::new(vec![
6660 Value::Int(1),
6661 Value::Int(42), Value::Float(0.0),
6663 ]))
6664 .unwrap_err();
6665 match err {
6666 StorageError::TypeMismatch {
6667 ref column,
6668 expected,
6669 actual,
6670 position,
6671 } => {
6672 assert_eq!(column, "name");
6673 assert_eq!(expected, DataType::Text);
6674 assert_eq!(actual, DataType::Int);
6675 assert_eq!(position, 1);
6676 }
6677 other => panic!("unexpected: {other:?}"),
6678 }
6679 assert_eq!(t.row_count(), 0);
6680 }
6681
6682 #[test]
6683 fn table_insert_null_into_not_null_rejected() {
6684 let mut cat = Catalog::new();
6685 cat.create_table(make_users_schema()).unwrap();
6686 let t = cat.get_mut("users").unwrap();
6687 let err = t
6688 .insert(Row::new(vec![
6689 Value::Int(1),
6690 Value::Null, Value::Float(1.0),
6692 ]))
6693 .unwrap_err();
6694 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
6695 }
6696
6697 #[test]
6698 fn table_insert_null_into_nullable_ok() {
6699 let mut cat = Catalog::new();
6700 cat.create_table(make_users_schema()).unwrap();
6701 let t = cat.get_mut("users").unwrap();
6702 t.insert(Row::new(vec![
6703 Value::Int(1),
6704 Value::Text("bob".into()),
6705 Value::Null,
6706 ]))
6707 .unwrap();
6708 assert_eq!(t.row_count(), 1);
6709 }
6710
6711 #[test]
6712 fn catalog_get_mut_independent_per_table() {
6713 let mut cat = Catalog::new();
6714 cat.create_table(TableSchema::new(
6715 "a",
6716 vec![ColumnSchema::new("v", DataType::Int, false)],
6717 ))
6718 .unwrap();
6719 cat.create_table(TableSchema::new(
6720 "b",
6721 vec![ColumnSchema::new("v", DataType::Int, false)],
6722 ))
6723 .unwrap();
6724 cat.get_mut("a")
6725 .unwrap()
6726 .insert(Row::new(vec![Value::Int(1)]))
6727 .unwrap();
6728 assert_eq!(cat.get("a").unwrap().row_count(), 1);
6729 assert_eq!(cat.get("b").unwrap().row_count(), 0);
6730 }
6731
6732 fn assert_round_trip(cat: &Catalog) {
6735 let bytes = cat.serialize();
6736 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6737 assert_eq!(restored.table_count(), cat.table_count());
6740 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
6741 assert_eq!(a.schema, b.schema);
6742 assert_eq!(a.rows, b.rows);
6743 }
6744 }
6745
6746 #[test]
6747 fn serialize_empty_catalog_round_trips() {
6748 assert_round_trip(&Catalog::new());
6749 }
6750
6751 #[test]
6752 fn serialize_single_empty_table_round_trips() {
6753 let mut cat = Catalog::new();
6754 cat.create_table(make_users_schema()).unwrap();
6755 assert_round_trip(&cat);
6756 }
6757
6758 #[test]
6759 fn nsw_clone_is_o1() {
6760 let mut cat = Catalog::new();
6769 cat.create_table(TableSchema::new(
6770 "docs",
6771 alloc::vec![
6772 ColumnSchema::new("id", DataType::Int, false),
6773 ColumnSchema::new(
6774 "v",
6775 DataType::Vector {
6776 dim: 3,
6777 encoding: VecEncoding::F32
6778 },
6779 true
6780 ),
6781 ],
6782 ))
6783 .unwrap();
6784 let t = cat.get_mut("docs").unwrap();
6785 for i in 0..1500_i32 {
6786 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
6788 t.insert(Row::new(alloc::vec![
6789 Value::Int(i),
6790 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6791 ]))
6792 .unwrap();
6793 }
6794 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6795 .unwrap();
6796 let g = match &cat.get("docs").unwrap().indices()[0].kind {
6797 IndexKind::Nsw(g) => g,
6798 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
6799 panic!("expected NSW")
6800 }
6801 };
6802 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
6805 assert!(
6806 g.layers.len() >= 2,
6807 "1500 nodes should populate at least two HNSW layers, got {}",
6808 g.layers.len()
6809 );
6810
6811 let cloned = g.clone();
6812
6813 assert!(
6814 g.levels.shares_storage_with(&cloned.levels),
6815 "levels PV not shared after clone — clone copied elements (O(N))"
6816 );
6817 assert_eq!(g.layers.len(), cloned.layers.len());
6818 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
6819 assert!(
6820 orig.shares_storage_with(cl),
6821 "layer {l} PV not shared after clone — clone copied elements (O(N))"
6822 );
6823 }
6824 }
6825
6826 #[test]
6827 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
6828 let mut cat = Catalog::new();
6835 cat.create_table(TableSchema::new(
6836 "vecs",
6837 alloc::vec![
6838 ColumnSchema::new("id", DataType::Int, false),
6839 ColumnSchema::new(
6840 "v",
6841 DataType::Vector {
6842 dim: 8,
6843 encoding: VecEncoding::Sq8,
6844 },
6845 false,
6846 ),
6847 ],
6848 ))
6849 .unwrap();
6850 let t = cat.get_mut("vecs").unwrap();
6851 for i in 0..32_i32 {
6852 #[allow(clippy::cast_precision_loss)]
6853 let base = (i as f32) * 0.03;
6854 let v: Vec<f32> = (0..8_i32)
6855 .map(|j| {
6856 #[allow(clippy::cast_precision_loss)]
6857 let off = (j as f32) * 0.01;
6858 base + off
6859 })
6860 .collect();
6861 t.insert(Row::new(alloc::vec![
6862 Value::Int(i),
6863 Value::Sq8Vector(quantize::quantize(&v)),
6864 ]))
6865 .unwrap();
6866 }
6867 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6868 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6871 let (before_cell, before_ty, before_hits) = {
6872 let t_ref = cat.get("vecs").unwrap();
6873 (
6874 t_ref.rows()[5].values[1].clone(),
6875 t_ref.schema().columns[1].ty,
6876 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6877 )
6878 };
6879
6880 let bytes = cat.serialize();
6881 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6882 let rt = restored.get("vecs").unwrap();
6883 assert_eq!(rt.schema().columns[1].ty, before_ty);
6884 assert_eq!(rt.rows()[5].values[1], before_cell);
6885 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6886 assert_eq!(before_hits, after_hits);
6887 }
6888
6889 #[test]
6890 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
6891 use crate::halfvec;
6898 let mut cat = Catalog::new();
6899 cat.create_table(TableSchema::new(
6900 "vecs",
6901 alloc::vec![
6902 ColumnSchema::new("id", DataType::Int, false),
6903 ColumnSchema::new(
6904 "v",
6905 DataType::Vector {
6906 dim: 8,
6907 encoding: VecEncoding::F16,
6908 },
6909 false,
6910 ),
6911 ],
6912 ))
6913 .unwrap();
6914 let t = cat.get_mut("vecs").unwrap();
6915 for i in 0..32_i32 {
6916 #[allow(clippy::cast_precision_loss)]
6917 let base = (i as f32) * 0.03;
6918 let v: Vec<f32> = (0..8_i32)
6919 .map(|j| {
6920 #[allow(clippy::cast_precision_loss)]
6921 let off = (j as f32) * 0.01;
6922 base + off
6923 })
6924 .collect();
6925 t.insert(Row::new(alloc::vec![
6926 Value::Int(i),
6927 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
6928 ]))
6929 .unwrap();
6930 }
6931 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6932 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6933 let (before_cell, before_ty, before_hits) = {
6934 let t_ref = cat.get("vecs").unwrap();
6935 (
6936 t_ref.rows()[5].values[1].clone(),
6937 t_ref.schema().columns[1].ty,
6938 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6939 )
6940 };
6941 let bytes = cat.serialize();
6942 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6943 let rt = restored.get("vecs").unwrap();
6944 assert_eq!(rt.schema().columns[1].ty, before_ty);
6945 assert_eq!(rt.rows()[5].values[1], before_cell);
6946 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6947 assert_eq!(before_hits, after_hits);
6948 }
6949
6950 #[test]
6951 #[allow(clippy::similar_names)]
6952 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
6953 use crate::halfvec;
6960 fn next(state: &mut u64) -> f32 {
6961 *state = state
6962 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6963 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6964 #[allow(clippy::cast_precision_loss)]
6965 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6966 2.0 * u - 1.0
6967 }
6968 let dim: u32 = 32;
6969 let n: usize = 512;
6970 let dim_us = dim as usize;
6971 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
6972 let corpus: Vec<Vec<f32>> = (0..n)
6973 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6974 .collect();
6975 let queries: Vec<Vec<f32>> = (0..32)
6976 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6977 .collect();
6978 let exact_top10: Vec<Vec<usize>> = queries
6979 .iter()
6980 .map(|q| {
6981 let mut scored: Vec<(f32, usize)> = corpus
6982 .iter()
6983 .enumerate()
6984 .map(|(i, v)| (l2_distance_sq(v, q), i))
6985 .collect();
6986 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6987 scored.into_iter().take(10).map(|(_, i)| i).collect()
6988 })
6989 .collect();
6990 let mut cat = Catalog::new();
6991 cat.create_table(TableSchema::new(
6992 "vecs",
6993 alloc::vec![
6994 ColumnSchema::new("id", DataType::Int, false),
6995 ColumnSchema::new(
6996 "v",
6997 DataType::Vector {
6998 dim,
6999 encoding: VecEncoding::F16,
7000 },
7001 false,
7002 ),
7003 ],
7004 ))
7005 .unwrap();
7006 let t = cat.get_mut("vecs").unwrap();
7007 for (i, v) in corpus.iter().enumerate() {
7008 t.insert(Row::new(alloc::vec![
7009 Value::Int(i32::try_from(i).unwrap()),
7010 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7011 ]))
7012 .unwrap();
7013 }
7014 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7015 let table = cat.get("vecs").unwrap();
7016 let mut total_overlap = 0_usize;
7017 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7018 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7019 for h in &hits {
7020 if exact.contains(h) {
7021 total_overlap += 1;
7022 }
7023 }
7024 }
7025 #[allow(clippy::cast_precision_loss)]
7026 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7027 assert!(
7028 recall >= 0.95,
7029 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7030 check halfvec dispatch in `cell_to_query_metric_distance`"
7031 );
7032 }
7033
7034 #[test]
7035 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7036 use crate::quantize;
7043 fn next(state: &mut u64) -> f32 {
7047 *state = state
7048 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7049 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7050 #[allow(clippy::cast_precision_loss)]
7051 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7052 2.0 * u - 1.0
7053 }
7054 let dim: u32 = 32;
7055 let n: usize = 512;
7056 let dim_us = dim as usize;
7057 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7058 let corpus: Vec<Vec<f32>> = (0..n)
7059 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7060 .collect();
7061 let queries: Vec<Vec<f32>> = (0..32)
7062 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7063 .collect();
7064 let exact_top10: Vec<Vec<usize>> = queries
7066 .iter()
7067 .map(|q| {
7068 let mut scored: Vec<(f32, usize)> = corpus
7069 .iter()
7070 .enumerate()
7071 .map(|(i, v)| (l2_distance_sq(v, q), i))
7072 .collect();
7073 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7074 scored.into_iter().take(10).map(|(_, i)| i).collect()
7075 })
7076 .collect();
7077 let mut cat = Catalog::new();
7080 cat.create_table(TableSchema::new(
7081 "vecs",
7082 alloc::vec![
7083 ColumnSchema::new("id", DataType::Int, false),
7084 ColumnSchema::new(
7085 "v",
7086 DataType::Vector {
7087 dim,
7088 encoding: VecEncoding::Sq8,
7089 },
7090 false,
7091 ),
7092 ],
7093 ))
7094 .unwrap();
7095 let t = cat.get_mut("vecs").unwrap();
7096 for (i, v) in corpus.iter().enumerate() {
7097 t.insert(Row::new(alloc::vec![
7098 Value::Int(i32::try_from(i).unwrap()),
7099 Value::Sq8Vector(quantize::quantize(v)),
7100 ]))
7101 .unwrap();
7102 }
7103 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7104 let table = cat.get("vecs").unwrap();
7105 let mut total_overlap = 0_usize;
7106 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7107 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7108 for h in &hits {
7109 if exact.contains(h) {
7110 total_overlap += 1;
7111 }
7112 }
7113 }
7114 #[allow(clippy::cast_precision_loss)]
7115 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7116 assert!(
7117 recall >= 0.95,
7118 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7119 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7120 );
7121 }
7122
7123 #[test]
7124 fn nsw_index_topology_persists_through_round_trip() {
7125 let mut cat = Catalog::new();
7131 cat.create_table(TableSchema::new(
7132 "docs",
7133 alloc::vec![
7134 ColumnSchema::new("id", DataType::Int, false),
7135 ColumnSchema::new(
7136 "v",
7137 DataType::Vector {
7138 dim: 3,
7139 encoding: VecEncoding::F32
7140 },
7141 true
7142 ),
7143 ],
7144 ))
7145 .unwrap();
7146 let t = cat.get_mut("docs").unwrap();
7147 for i in 0..6_i32 {
7148 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7150 let row = Row::new(alloc::vec![
7151 Value::Int(i),
7152 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7153 ]);
7154 t.insert(row).unwrap();
7155 }
7156 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7157 .unwrap();
7158 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7159 IndexKind::Nsw(g) => g.clone(),
7160 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7161 panic!("expected NSW")
7162 }
7163 };
7164 let bytes = cat.serialize();
7165 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7166 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7167 IndexKind::Nsw(g) => g.clone(),
7168 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7169 panic!("expected NSW")
7170 }
7171 };
7172 assert_eq!(restored_graph.m, original.m);
7173 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7174 assert_eq!(restored_graph.entry, original.entry);
7175 assert_eq!(restored_graph.entry_level, original.entry_level);
7176 assert_eq!(restored_graph.levels, original.levels);
7177 assert_eq!(restored_graph.layers, original.layers);
7178 }
7179
7180 #[test]
7181 fn hnsw_level_assignment_is_deterministic() {
7182 for i in 0..32usize {
7185 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7186 }
7187 }
7188
7189 #[test]
7190 fn hnsw_layer_0_dominates_population() {
7191 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7196 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7197 }
7198
7199 #[test]
7200 fn hnsw_search_matches_brute_force_for_l2_top1() {
7201 let mut cat = Catalog::new();
7205 cat.create_table(TableSchema::new(
7206 "vecs",
7207 alloc::vec![
7208 ColumnSchema::new("id", DataType::Int, false),
7209 ColumnSchema::new(
7210 "v",
7211 DataType::Vector {
7212 dim: 3,
7213 encoding: VecEncoding::F32
7214 },
7215 true
7216 ),
7217 ],
7218 ))
7219 .unwrap();
7220 let t = cat.get_mut("vecs").unwrap();
7221 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7222 (1, [0.0, 0.0, 0.0]),
7223 (2, [1.0, 0.0, 0.0]),
7224 (3, [0.0, 1.0, 0.0]),
7225 (4, [0.0, 0.0, 1.0]),
7226 (5, [1.0, 1.0, 0.0]),
7227 (6, [1.0, 0.0, 1.0]),
7228 (7, [0.0, 1.0, 1.0]),
7229 (8, [1.0, 1.0, 1.0]),
7230 (9, [0.5, 0.5, 0.5]),
7231 (10, [0.2, 0.8, 0.5]),
7232 ];
7233 for &(id, v) in &dataset {
7234 t.insert(Row::new(alloc::vec![
7235 Value::Int(id),
7236 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7237 ]))
7238 .unwrap();
7239 }
7240 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7241 let idx_pos = cat
7242 .get("vecs")
7243 .unwrap()
7244 .indices()
7245 .iter()
7246 .position(|i| i.name == "v_idx")
7247 .unwrap();
7248 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7249 let table = cat.get("vecs").unwrap();
7250 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7251 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7252 .map(|i| {
7253 let Value::Vector(v) = &table.rows[i].values[1] else {
7254 return (f32::INFINITY, i);
7255 };
7256 (l2_distance_sq(v, &query), i)
7257 })
7258 .collect();
7259 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7260 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7261 assert_eq!(
7262 hnsw_top[0].1, brute[0].1,
7263 "HNSW top-1 != brute-force top-1 for {query:?}"
7264 );
7265 }
7266 }
7267
7268 #[test]
7269 fn serialize_table_with_rows_round_trips() {
7270 let mut cat = Catalog::new();
7271 cat.create_table(make_users_schema()).unwrap();
7272 let t = cat.get_mut("users").unwrap();
7273 t.insert(Row::new(vec![
7274 Value::Int(1),
7275 Value::Text("alice".into()),
7276 Value::Float(95.5),
7277 ]))
7278 .unwrap();
7279 t.insert(Row::new(vec![
7280 Value::Int(2),
7281 Value::Text("bob".into()),
7282 Value::Null,
7283 ]))
7284 .unwrap();
7285 assert_round_trip(&cat);
7286 }
7287
7288 #[test]
7289 fn serialize_multiple_tables_round_trips() {
7290 let mut cat = Catalog::new();
7291 cat.create_table(make_users_schema()).unwrap();
7292 cat.create_table(TableSchema::new(
7293 "flags",
7294 vec![
7295 ColumnSchema::new("id", DataType::BigInt, false),
7296 ColumnSchema::new("active", DataType::Bool, false),
7297 ],
7298 ))
7299 .unwrap();
7300 cat.get_mut("flags")
7301 .unwrap()
7302 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7303 .unwrap();
7304 assert_round_trip(&cat);
7305 }
7306
7307 #[test]
7308 fn deserialize_rejects_bad_magic() {
7309 let mut buf = b"BADMAGIC".to_vec();
7310 buf.push(FILE_VERSION);
7311 buf.extend_from_slice(&0u32.to_le_bytes());
7312 let err = Catalog::deserialize(&buf).unwrap_err();
7313 assert!(matches!(err, StorageError::Corrupt(_)));
7314 }
7315
7316 #[test]
7317 fn deserialize_rejects_unsupported_version() {
7318 let mut buf = FILE_MAGIC.to_vec();
7319 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7321 let err = Catalog::deserialize(&buf).unwrap_err();
7322 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7323 }
7324
7325 #[test]
7326 fn deserialize_rejects_truncated_file() {
7327 let mut cat = Catalog::new();
7328 cat.create_table(make_users_schema()).unwrap();
7329 let bytes = cat.serialize();
7330 let truncated = &bytes[..bytes.len() - 1];
7332 assert!(matches!(
7333 Catalog::deserialize(truncated),
7334 Err(StorageError::Corrupt(_))
7335 ));
7336 }
7337
7338 #[test]
7339 fn deserialize_rejects_trailing_garbage() {
7340 let cat = Catalog::new();
7341 let mut bytes = cat.serialize();
7342 bytes.push(0xFF);
7343 assert!(matches!(
7344 Catalog::deserialize(&bytes),
7345 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7346 ));
7347 }
7348
7349 fn populated_users() -> Catalog {
7352 let mut cat = Catalog::new();
7353 cat.create_table(make_users_schema()).unwrap();
7354 let t = cat.get_mut("users").unwrap();
7355 for (id, name, score) in [
7356 (1, "alice", Some(90.0)),
7357 (2, "bob", None),
7358 (3, "alice", Some(70.0)), ] {
7360 t.insert(Row::new(vec![
7361 Value::Int(id),
7362 Value::Text(name.into()),
7363 score.map_or(Value::Null, Value::Float),
7364 ]))
7365 .unwrap();
7366 }
7367 cat
7368 }
7369
7370 #[test]
7371 fn add_index_builds_from_existing_rows() {
7372 let mut cat = populated_users();
7373 cat.get_mut("users")
7374 .unwrap()
7375 .add_index("by_id".into(), "id")
7376 .unwrap();
7377 let t = cat.get("users").unwrap();
7378 let idx = t.index_on(0).expect("index_on(0)");
7379 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7380 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7381 }
7382
7383 #[test]
7384 fn add_index_dup_name_rejected() {
7385 let mut cat = populated_users();
7386 let t = cat.get_mut("users").unwrap();
7387 t.add_index("ix".into(), "id").unwrap();
7388 let err = t.add_index("ix".into(), "name").unwrap_err();
7389 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7390 }
7391
7392 #[test]
7393 fn add_index_unknown_column_rejected() {
7394 let mut cat = populated_users();
7395 let err = cat
7396 .get_mut("users")
7397 .unwrap()
7398 .add_index("ix".into(), "ghost")
7399 .unwrap_err();
7400 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7401 }
7402
7403 #[test]
7404 fn insert_after_create_index_updates_it() {
7405 let mut cat = populated_users();
7406 let t = cat.get_mut("users").unwrap();
7407 t.add_index("by_name".into(), "name").unwrap();
7408 t.insert(Row::new(vec![
7409 Value::Int(4),
7410 Value::Text("dave".into()),
7411 Value::Null,
7412 ]))
7413 .unwrap();
7414 let idx = t.index_on(1).unwrap();
7415 assert_eq!(
7416 idx.lookup_eq(&IndexKey::Text("dave".into())),
7417 &[RowLocator::Hot(3)]
7418 );
7419 assert_eq!(
7421 idx.lookup_eq(&IndexKey::Text("alice".into())),
7422 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7423 );
7424 }
7425
7426 #[test]
7427 fn null_or_float_values_are_not_indexed() {
7428 let mut cat = populated_users();
7429 let t = cat.get_mut("users").unwrap();
7430 t.add_index("by_score".into(), "score").unwrap();
7431 let idx = t.index_on(2).unwrap();
7432 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7437 }
7438
7439 #[test]
7442 fn vector_value_data_type_carries_dim() {
7443 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7444 assert_eq!(
7445 v.data_type(),
7446 Some(DataType::Vector {
7447 dim: 3,
7448 encoding: VecEncoding::F32
7449 })
7450 );
7451 }
7452
7453 #[test]
7454 fn vector_column_insert_matching_dim_ok() {
7455 let mut cat = Catalog::new();
7456 cat.create_table(TableSchema::new(
7457 "emb",
7458 vec![ColumnSchema::new(
7459 "v",
7460 DataType::Vector {
7461 dim: 3,
7462 encoding: VecEncoding::F32,
7463 },
7464 false,
7465 )],
7466 ))
7467 .unwrap();
7468 cat.get_mut("emb")
7469 .unwrap()
7470 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7471 .unwrap();
7472 }
7473
7474 #[test]
7475 fn vector_column_insert_dim_mismatch_rejected() {
7476 let mut cat = Catalog::new();
7477 cat.create_table(TableSchema::new(
7478 "emb",
7479 vec![ColumnSchema::new(
7480 "v",
7481 DataType::Vector {
7482 dim: 3,
7483 encoding: VecEncoding::F32,
7484 },
7485 false,
7486 )],
7487 ))
7488 .unwrap();
7489 let err = cat
7490 .get_mut("emb")
7491 .unwrap()
7492 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7493 .unwrap_err();
7494 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7495 }
7496
7497 #[test]
7498 fn vector_value_survives_catalog_round_trip() {
7499 let mut cat = Catalog::new();
7500 cat.create_table(TableSchema::new(
7501 "emb",
7502 vec![
7503 ColumnSchema::new("id", DataType::Int, false),
7504 ColumnSchema::new(
7505 "v",
7506 DataType::Vector {
7507 dim: 4,
7508 encoding: VecEncoding::F32,
7509 },
7510 false,
7511 ),
7512 ],
7513 ))
7514 .unwrap();
7515 cat.get_mut("emb")
7516 .unwrap()
7517 .insert(Row::new(vec![
7518 Value::Int(1),
7519 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
7520 ]))
7521 .unwrap();
7522 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
7523 let table = restored.get("emb").unwrap();
7524 assert_eq!(
7525 table.schema().columns[1].ty,
7526 DataType::Vector {
7527 dim: 4,
7528 encoding: VecEncoding::F32
7529 }
7530 );
7531 assert_eq!(
7532 table.rows()[0].values[1],
7533 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
7534 );
7535 }
7536
7537 #[test]
7538 fn index_survives_serialize_deserialize_round_trip() {
7539 let mut cat = populated_users();
7540 cat.get_mut("users")
7541 .unwrap()
7542 .add_index("by_name".into(), "name")
7543 .unwrap();
7544 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7545 let idx = restored
7546 .get("users")
7547 .unwrap()
7548 .index_on(1)
7549 .expect("index_on(1) after restore");
7550 assert_eq!(idx.name, "by_name");
7551 assert_eq!(
7553 idx.lookup_eq(&IndexKey::Text("alice".into())),
7554 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7555 );
7556 }
7557
7558 fn bigint_pk_users_schema() -> TableSchema {
7563 TableSchema::new(
7564 "users",
7565 vec![
7566 ColumnSchema::new("id", DataType::BigInt, false),
7567 ColumnSchema::new("name", DataType::Text, false),
7568 ],
7569 )
7570 }
7571
7572 fn make_user_row(id: i64, name: &str) -> Row {
7573 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
7574 }
7575
7576 #[test]
7577 fn lookup_by_pk_finds_row_via_hot_index() {
7578 let mut cat = Catalog::new();
7579 cat.create_table(bigint_pk_users_schema()).unwrap();
7580 let t = cat.get_mut("users").unwrap();
7581 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7582 t.insert(make_user_row(id, name)).unwrap();
7583 }
7584 t.add_index("by_id".into(), "id").unwrap();
7585 let got = cat
7587 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7588 .unwrap();
7589 assert_eq!(got, make_user_row(2, "bob"));
7590 assert_eq!(cat.cold_segment_count(), 0);
7591 }
7592
7593 #[test]
7594 fn lookup_by_pk_returns_none_when_key_missing() {
7595 let mut cat = Catalog::new();
7596 cat.create_table(bigint_pk_users_schema()).unwrap();
7597 let t = cat.get_mut("users").unwrap();
7598 t.insert(make_user_row(1, "alice")).unwrap();
7599 t.add_index("by_id".into(), "id").unwrap();
7600 assert!(
7601 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7602 .is_none()
7603 );
7604 assert!(
7606 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
7607 .is_none()
7608 );
7609 assert!(
7610 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
7611 .is_none()
7612 );
7613 }
7614
7615 #[test]
7616 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
7617 let mut cat = Catalog::new();
7621 cat.create_table(bigint_pk_users_schema()).unwrap();
7622 let t = cat.get_mut("users").unwrap();
7623 t.add_index("by_id".into(), "id").unwrap();
7624 let schema = t.schema.clone();
7625
7626 let cold_rows: Vec<(i64, &str)> =
7627 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
7628 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7629 .iter()
7630 .map(|(id, name)| {
7631 let row = make_user_row(*id, name);
7632 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7633 })
7634 .collect();
7635 let (seg_bytes, _meta) =
7636 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7637 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7638 assert_eq!(seg_id, 0);
7639 assert_eq!(cat.cold_segment_count(), 1);
7640
7641 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7642 .iter()
7643 .map(|(id, _)| {
7644 (
7645 IndexKey::Int(*id),
7646 RowLocator::Cold {
7647 segment_id: seg_id,
7648 page_offset: 0,
7649 },
7650 )
7651 })
7652 .collect();
7653 let registered = cat
7654 .get_mut("users")
7655 .unwrap()
7656 .register_cold_locators("by_id", pairs)
7657 .unwrap();
7658 assert_eq!(registered, 4);
7659
7660 for (id, name) in &cold_rows {
7661 let got = cat
7662 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7663 .unwrap_or_else(|| panic!("cold key {id} not found"));
7664 assert_eq!(got, make_user_row(*id, name));
7665 }
7666 assert!(
7668 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7669 .is_none()
7670 );
7671 }
7672
7673 #[test]
7674 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
7675 let mut cat = Catalog::new();
7679 cat.create_table(bigint_pk_users_schema()).unwrap();
7680 let t = cat.get_mut("users").unwrap();
7681 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7682 t.insert(make_user_row(id, name)).unwrap();
7683 }
7684 t.add_index("by_id".into(), "id").unwrap();
7685 let schema = t.schema.clone();
7686
7687 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
7688 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7689 .iter()
7690 .map(|(id, name)| {
7691 let row = make_user_row(*id, name);
7692 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7693 })
7694 .collect();
7695 let (seg_bytes, _) =
7696 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7697 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7698 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7699 .iter()
7700 .map(|(id, _)| {
7701 (
7702 IndexKey::Int(*id),
7703 RowLocator::Cold {
7704 segment_id: seg_id,
7705 page_offset: 0,
7706 },
7707 )
7708 })
7709 .collect();
7710 cat.get_mut("users")
7711 .unwrap()
7712 .register_cold_locators("by_id", pairs)
7713 .unwrap();
7714
7715 assert_eq!(
7717 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7718 .unwrap(),
7719 make_user_row(1, "alice")
7720 );
7721 assert_eq!(
7722 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7723 .unwrap(),
7724 make_user_row(2, "bob")
7725 );
7726 assert_eq!(
7728 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
7729 .unwrap(),
7730 make_user_row(100, "ivy")
7731 );
7732 assert_eq!(
7733 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
7734 .unwrap(),
7735 make_user_row(200, "joe")
7736 );
7737 assert!(
7739 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
7740 .is_none()
7741 );
7742 }
7743
7744 #[test]
7745 fn register_cold_locators_rejects_nsw_index() {
7746 let mut cat = Catalog::new();
7747 cat.create_table(TableSchema::new(
7748 "vecs",
7749 vec![
7750 ColumnSchema::new("id", DataType::Int, false),
7751 ColumnSchema::new(
7752 "v",
7753 DataType::Vector {
7754 dim: 4,
7755 encoding: VecEncoding::F32,
7756 },
7757 false,
7758 ),
7759 ],
7760 ))
7761 .unwrap();
7762 let t = cat.get_mut("vecs").unwrap();
7763 t.insert(Row::new(vec![
7764 Value::Int(1),
7765 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
7766 ]))
7767 .unwrap();
7768 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
7769 let err = t
7770 .register_cold_locators(
7771 "by_v",
7772 vec![(
7773 IndexKey::Int(1),
7774 RowLocator::Cold {
7775 segment_id: 0,
7776 page_offset: 0,
7777 },
7778 )],
7779 )
7780 .unwrap_err();
7781 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
7784 }
7785
7786 #[test]
7787 fn load_segment_bytes_rejects_garbage() {
7788 let mut cat = Catalog::new();
7789 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
7790 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
7791 assert_eq!(cat.cold_segment_count(), 0);
7793 }
7794
7795 #[test]
7796 fn load_segment_bytes_returns_sequential_ids() {
7797 let mut cat = Catalog::new();
7798 cat.create_table(bigint_pk_users_schema()).unwrap();
7799 let schema = cat.get("users").unwrap().schema.clone();
7800 for batch in 0u32..3 {
7801 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
7802 .map(|i| {
7803 let id = u64::from(batch) * 100 + i;
7804 let row = make_user_row(id.cast_signed(), "x");
7805 (id, encode_row_body_dense(&row, &schema))
7806 })
7807 .collect();
7808 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7809 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
7810 }
7811 assert_eq!(cat.cold_segment_count(), 3);
7812 }
7813
7814 #[test]
7821 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
7822 let mut cat = populated_users();
7829 cat.get_mut("users")
7830 .unwrap()
7831 .add_index("by_name".into(), "name")
7832 .unwrap();
7833
7834 let v8_bytes = encode_as_v8(&cat);
7839 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
7840
7841 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
7842 let idx = restored
7843 .get("users")
7844 .unwrap()
7845 .index_on(1)
7846 .expect("index_on(1) after restore");
7847 assert_eq!(
7850 idx.lookup_eq(&IndexKey::Text("alice".into())),
7851 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7852 );
7853 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
7855 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
7856 }
7857 }
7858
7859 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
7864 let mut out = Vec::with_capacity(64);
7865 out.extend_from_slice(FILE_MAGIC);
7866 out.push(8u8);
7867 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
7868 for t in &cat.tables {
7869 write_str(&mut out, &t.schema.name);
7870 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
7871 for c in &t.schema.columns {
7872 write_str(&mut out, &c.name);
7873 write_data_type(&mut out, c.ty);
7874 out.push(u8::from(c.nullable));
7875 match &c.default {
7876 None => out.push(0),
7877 Some(v) => {
7878 out.push(1);
7879 write_value(&mut out, v);
7880 }
7881 }
7882 out.push(u8::from(c.auto_increment));
7883 }
7884 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
7885 for row in &t.rows {
7886 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
7887 }
7888 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
7889 for idx in &t.indices {
7890 write_str(&mut out, &idx.name);
7891 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
7892 match &idx.kind {
7893 IndexKind::BTree(_) => out.push(0),
7896 IndexKind::Nsw(g) => {
7897 out.push(1);
7898 write_u16(&mut out, u16::try_from(g.m).unwrap());
7899 write_nsw_graph(&mut out, g);
7900 }
7901 IndexKind::Brin { .. } => panic!(
7904 "v8 catalog writer cannot serialise BRIN — \
7905 tests with BRIN indices must use the current writer"
7906 ),
7907 IndexKind::Gin(_) => panic!(
7908 "v8 catalog writer cannot serialise GIN — \
7909 tests with GIN indices must use the current writer"
7910 ),
7911 }
7912 }
7913 }
7914 out
7915 }
7916
7917 #[test]
7923 fn v9_catalog_round_trip_preserves_cold_locators() {
7924 let mut cat = Catalog::new();
7925 cat.create_table(bigint_pk_users_schema()).unwrap();
7926 let t = cat.get_mut("users").unwrap();
7927 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7929 t.insert(make_user_row(id, name)).unwrap();
7930 }
7931 t.add_index("by_id".into(), "id").unwrap();
7932 let schema = t.schema.clone();
7933
7934 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
7936 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7937 .iter()
7938 .map(|(id, name)| {
7939 let row = make_user_row(*id, name);
7940 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7941 })
7942 .collect();
7943 let (seg_bytes, _) =
7944 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7945 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
7946 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7947 .iter()
7948 .map(|(id, _)| {
7949 (
7950 IndexKey::Int(*id),
7951 RowLocator::Cold {
7952 segment_id: seg_id,
7953 page_offset: 0,
7954 },
7955 )
7956 })
7957 .collect();
7958 cat.get_mut("users")
7959 .unwrap()
7960 .register_cold_locators("by_id", pairs)
7961 .unwrap();
7962
7963 let bytes = cat.serialize();
7965 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
7966 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
7967
7968 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
7975 assert_eq!(restored_seg_id, seg_id);
7976
7977 let idx = restored.get("users").unwrap().index_on(0).unwrap();
7978 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
7980 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7981 for (id, _) in &cold_rows {
7983 assert_eq!(
7984 idx.lookup_eq(&IndexKey::Int(*id)),
7985 &[RowLocator::Cold {
7986 segment_id: seg_id,
7987 page_offset: 0,
7988 }]
7989 );
7990 }
7991 assert_eq!(
7993 restored
7994 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7995 .unwrap(),
7996 make_user_row(2, "bob")
7997 );
7998 for (id, name) in &cold_rows {
7999 assert_eq!(
8000 restored
8001 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8002 .unwrap(),
8003 make_user_row(*id, name)
8004 );
8005 }
8006 }
8007
8008 #[test]
8015 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8016 let schema = TableSchema::new(
8017 "wide",
8018 vec![
8019 ColumnSchema::new("a", DataType::SmallInt, true),
8020 ColumnSchema::new("b", DataType::Int, false),
8021 ColumnSchema::new("c", DataType::BigInt, false),
8022 ColumnSchema::new("d", DataType::Float, false),
8023 ColumnSchema::new("e", DataType::Bool, false),
8024 ColumnSchema::new("f", DataType::Text, false),
8025 ColumnSchema::new(
8026 "g",
8027 DataType::Vector {
8028 dim: 3,
8029 encoding: VecEncoding::F32,
8030 },
8031 false,
8032 ),
8033 ColumnSchema::new(
8034 "h",
8035 DataType::Numeric {
8036 precision: 18,
8037 scale: 2,
8038 },
8039 false,
8040 ),
8041 ColumnSchema::new("i", DataType::Date, false),
8042 ColumnSchema::new("j", DataType::Timestamp, false),
8043 ],
8044 );
8045 let cases: &[Row] = &[
8046 Row::new(vec![
8047 Value::SmallInt(7),
8048 Value::Int(42),
8049 Value::BigInt(1_000_000),
8050 Value::Float(1.5),
8051 Value::Bool(true),
8052 Value::Text("hello".into()),
8053 Value::Vector(vec![1.0, 2.0, 3.0]),
8054 Value::Numeric {
8055 scaled: 12345,
8056 scale: 2,
8057 },
8058 Value::Date(20_000),
8059 Value::Timestamp(1_700_000_000_000_000),
8060 ]),
8061 Row::new(vec![
8063 Value::Null,
8064 Value::Int(0),
8065 Value::BigInt(0),
8066 Value::Float(0.0),
8067 Value::Bool(false),
8068 Value::Text(String::new()),
8069 Value::Vector(vec![]),
8070 Value::Numeric {
8071 scaled: 0,
8072 scale: 2,
8073 },
8074 Value::Date(0),
8075 Value::Timestamp(0),
8076 ]),
8077 Row::new(vec![
8078 Value::SmallInt(-1),
8079 Value::Int(-1),
8080 Value::BigInt(-1),
8081 Value::Float(-0.5),
8082 Value::Bool(true),
8083 Value::Text("a much longer payload here".into()),
8084 Value::Vector(vec![0.1, 0.2, 0.3]),
8085 Value::Numeric {
8086 scaled: -999_999_999,
8087 scale: 2,
8088 },
8089 Value::Date(-1),
8090 Value::Timestamp(-1),
8091 ]),
8092 ];
8093 for row in cases {
8094 let actual = encode_row_body_dense(row, &schema).len();
8095 let fast = row_body_encoded_len(row, &schema);
8096 assert_eq!(actual, fast, "row {row:?}");
8097 }
8098 }
8099
8100 #[test]
8101 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8102 let mut cat = Catalog::new();
8103 cat.create_table(bigint_pk_users_schema()).unwrap();
8104 let t = cat.get_mut("users").unwrap();
8105 assert_eq!(t.hot_bytes(), 0);
8106 let mut expected: u64 = 0;
8107 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8108 let row = make_user_row(id, name);
8109 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8110 t.insert(row).unwrap();
8111 }
8112 assert_eq!(t.hot_bytes(), expected);
8113 assert_eq!(cat.hot_tier_bytes(), expected);
8114 }
8115
8116 #[test]
8117 fn hot_bytes_shrinks_on_delete() {
8118 let mut cat = Catalog::new();
8119 cat.create_table(bigint_pk_users_schema()).unwrap();
8120 let t = cat.get_mut("users").unwrap();
8121 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8122 t.insert(make_user_row(id, name)).unwrap();
8123 }
8124 let before = t.hot_bytes();
8125 let bob_row = make_user_row(2, "bob");
8127 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8128 let removed = t.delete_rows(&[1]);
8129 assert_eq!(removed, 1);
8130 assert_eq!(t.hot_bytes(), before - bob_bytes);
8131 }
8132
8133 #[test]
8134 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8135 let mut cat = Catalog::new();
8136 cat.create_table(bigint_pk_users_schema()).unwrap();
8137 let t = cat.get_mut("users").unwrap();
8138 t.insert(make_user_row(1, "alice")).unwrap();
8139 let after_insert = t.hot_bytes();
8140 let new_row = make_user_row(1, "alice-the-longer-name");
8143 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8144 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8145 t.update_row(0, new_row.values).unwrap();
8146 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8147 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8148 }
8149
8150 #[test]
8151 fn hot_bytes_round_trips_through_serialize_deserialize() {
8152 let mut cat = Catalog::new();
8153 cat.create_table(bigint_pk_users_schema()).unwrap();
8154 let t = cat.get_mut("users").unwrap();
8155 for i in 0..10 {
8156 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8157 .unwrap();
8158 }
8159 let pre = cat.hot_tier_bytes();
8160 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8161 assert_eq!(restored.hot_tier_bytes(), pre);
8162 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8163 }
8164
8165 #[test]
8172 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8173 let mut cat = Catalog::new();
8174 cat.create_table(bigint_pk_users_schema()).unwrap();
8175 let t = cat.get_mut("users").unwrap();
8176 for id in 0..10i64 {
8177 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8178 .unwrap();
8179 }
8180 t.add_index("by_id".into(), "id").unwrap();
8181 let total_bytes_before = t.hot_bytes();
8182
8183 let report = cat
8184 .freeze_oldest_to_cold("users", "by_id", 6)
8185 .expect("freeze succeeds");
8186 assert_eq!(report.frozen_rows, 6);
8187 assert_eq!(report.segment_id, 0);
8188 assert!(report.bytes_freed > 0);
8189 assert!(!report.segment_bytes.is_empty());
8190
8191 let t = cat.get("users").unwrap();
8192 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8193 assert_eq!(cat.cold_segment_count(), 1);
8194 assert_eq!(
8196 t.hot_bytes(),
8197 total_bytes_before - report.bytes_freed,
8198 "hot_bytes accounting matches FreezeReport"
8199 );
8200
8201 for id in 0..10i64 {
8204 let got = cat
8205 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8206 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8207 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8208 }
8209 }
8210
8211 #[test]
8216 fn freeze_twice_preserves_prior_cold_locators() {
8217 let mut cat = Catalog::new();
8218 cat.create_table(bigint_pk_users_schema()).unwrap();
8219 let t = cat.get_mut("users").unwrap();
8220 for id in 0..12i64 {
8221 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8222 .unwrap();
8223 }
8224 t.add_index("by_id".into(), "id").unwrap();
8225
8226 cat.freeze_oldest_to_cold("users", "by_id", 4)
8227 .expect("first freeze ok");
8228 cat.freeze_oldest_to_cold("users", "by_id", 4)
8229 .expect("second freeze ok");
8230
8231 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8232 assert_eq!(cat.cold_segment_count(), 2);
8233 for id in 0..12i64 {
8236 let got = cat
8237 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8238 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8239 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8240 }
8241 }
8242
8243 #[test]
8246 fn freeze_oldest_to_cold_rejects_invalid_input() {
8247 let mut cat = Catalog::new();
8248 cat.create_table(bigint_pk_users_schema()).unwrap();
8249 let t = cat.get_mut("users").unwrap();
8250 for id in 0..3i64 {
8251 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8252 .unwrap();
8253 }
8254 t.add_index("by_id".into(), "id").unwrap();
8255
8256 assert!(matches!(
8258 cat.freeze_oldest_to_cold("users", "by_id", 0),
8259 Err(StorageError::Corrupt(_))
8260 ));
8261 assert!(matches!(
8263 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8264 Err(StorageError::Corrupt(_))
8265 ));
8266 assert!(matches!(
8268 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8269 Err(StorageError::Corrupt(_))
8270 ));
8271 assert!(matches!(
8273 cat.freeze_oldest_to_cold("users", "by_id", 999),
8274 Err(StorageError::Corrupt(_))
8275 ));
8276 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8278 assert_eq!(cat.cold_segment_count(), 0);
8279 }
8280
8281 #[test]
8284 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8285 let mut cat = Catalog::new();
8286 cat.create_table(TableSchema::new(
8287 "by_name",
8288 vec![
8289 ColumnSchema::new("name", DataType::Text, false),
8290 ColumnSchema::new("payload", DataType::BigInt, false),
8291 ],
8292 ))
8293 .unwrap();
8294 let t = cat.get_mut("by_name").unwrap();
8295 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8296 .unwrap();
8297 t.add_index("by_n".into(), "name").unwrap();
8298 let err = cat
8299 .freeze_oldest_to_cold("by_name", "by_n", 1)
8300 .expect_err("non-integer PK rejected");
8301 match err {
8302 StorageError::Corrupt(s) => assert!(
8303 s.contains("non-integer"),
8304 "error message names the constraint: {s}"
8305 ),
8306 other => panic!("expected Corrupt, got {other:?}"),
8307 }
8308 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8310 assert_eq!(cat.cold_segment_count(), 0);
8311 }
8312
8313 #[test]
8318 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8319 let mut cat = Catalog::new();
8320 cat.create_table(bigint_pk_users_schema()).unwrap();
8321 let t = cat.get_mut("users").unwrap();
8322 for id in 0..6i64 {
8323 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8324 .unwrap();
8325 }
8326 t.add_index("by_id".into(), "id").unwrap();
8327 t.add_index("by_name".into(), "name").unwrap();
8328
8329 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8330
8331 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8335 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8336 assert_eq!(got.len(), 1);
8337 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8338 match got[0] {
8339 RowLocator::Hot(i) => {
8340 assert_eq!(i, 1);
8343 }
8344 RowLocator::Cold { .. } => unreachable!(),
8345 }
8346 }
8347
8348 #[test]
8356 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8357 let mut cat = Catalog::new();
8358 cat.create_table(bigint_pk_users_schema()).unwrap();
8359 let t = cat.get_mut("users").unwrap();
8360 for id in 0..6i64 {
8361 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8362 .unwrap();
8363 }
8364 t.add_index("by_id".into(), "id").unwrap();
8365 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8368 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8369
8370 let new_idx = cat
8372 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8373 .expect("promote ok")
8374 .expect("PK 2 was cold");
8375 assert_eq!(
8376 new_idx, 2,
8377 "promoted row appended after the 2 surviving hot rows"
8378 );
8379
8380 let t = cat.get("users").unwrap();
8381 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8382 let row = make_user_row(2, "u-2");
8384 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8385 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8386
8387 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8390 assert_eq!(entries.len(), 1, "exactly one locator per key");
8391 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8392 assert_eq!(
8394 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8395 .unwrap(),
8396 row
8397 );
8398 assert_eq!(
8401 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8402 .unwrap(),
8403 make_user_row(0, "u-0")
8404 );
8405 }
8406
8407 #[test]
8411 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8412 let mut cat = Catalog::new();
8413 cat.create_table(bigint_pk_users_schema()).unwrap();
8414 let t = cat.get_mut("users").unwrap();
8415 t.insert(make_user_row(7, "alice")).unwrap();
8416 t.add_index("by_id".into(), "id").unwrap();
8417
8418 assert!(
8420 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8421 .unwrap()
8422 .is_none()
8423 );
8424 assert!(
8426 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8427 .unwrap()
8428 .is_none()
8429 );
8430 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8432 assert_eq!(cat.cold_segment_count(), 0);
8433 }
8434
8435 #[test]
8440 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8441 let mut cat = Catalog::new();
8442 cat.create_table(bigint_pk_users_schema()).unwrap();
8443 let t = cat.get_mut("users").unwrap();
8444 for id in 0..5i64 {
8445 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8446 .unwrap();
8447 }
8448 t.add_index("by_id".into(), "id").unwrap();
8449 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8450
8451 assert!(
8453 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8454 .is_some(),
8455 "frozen PK resolves before shadow"
8456 );
8457 let removed = cat
8458 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8459 .unwrap();
8460 assert_eq!(removed, 1, "exactly one cold locator retired");
8461
8462 assert!(
8465 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8466 .is_none(),
8467 "shadowed key no longer resolves"
8468 );
8469 assert_eq!(
8471 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8472 .unwrap(),
8473 make_user_row(0, "u-0")
8474 );
8475 assert_eq!(
8476 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8477 .unwrap(),
8478 make_user_row(2, "u-2")
8479 );
8480 }
8481
8482 #[test]
8487 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8488 let mut cat = Catalog::new();
8489 cat.create_table(bigint_pk_users_schema()).unwrap();
8490 let t = cat.get_mut("users").unwrap();
8491 t.insert(make_user_row(1, "alice")).unwrap();
8492 t.add_index("by_id".into(), "id").unwrap();
8493 assert_eq!(
8494 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8495 .unwrap(),
8496 0,
8497 "hot-only key drops no cold locators"
8498 );
8499 assert_eq!(
8500 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
8501 .unwrap(),
8502 0,
8503 "absent key drops no cold locators"
8504 );
8505 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8506 }
8507
8508 #[test]
8510 fn promote_and_shadow_reject_invalid_inputs() {
8511 let mut cat = Catalog::new();
8512 cat.create_table(bigint_pk_users_schema()).unwrap();
8513 let t = cat.get_mut("users").unwrap();
8514 t.insert(make_user_row(1, "alice")).unwrap();
8515 t.add_index("by_id".into(), "id").unwrap();
8516
8517 assert!(matches!(
8519 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
8520 Err(StorageError::Corrupt(_))
8521 ));
8522 assert!(matches!(
8523 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
8524 Err(StorageError::Corrupt(_))
8525 ));
8526 assert!(matches!(
8528 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8529 Err(StorageError::Corrupt(_))
8530 ));
8531 assert!(matches!(
8532 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8533 Err(StorageError::Corrupt(_))
8534 ));
8535 }
8536
8537 #[test]
8544 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
8545 let mut a = Catalog::new();
8546 let mut b = Catalog::new();
8547 for cat in [&mut a, &mut b] {
8548 cat.create_table(bigint_pk_users_schema()).unwrap();
8549 let t = cat.get_mut("users").unwrap();
8550 for id in 0..10i64 {
8551 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8552 .unwrap();
8553 }
8554 t.add_index("by_id".into(), "id").unwrap();
8555 }
8556 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
8557 let slice = b
8558 .prepare_freeze_slice("users", "by_id", 0..6)
8559 .expect("prepare");
8560 let parallel = b
8561 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
8562 .expect("commit");
8563 assert_eq!(single.segment_id, parallel.segment_id);
8564 assert_eq!(single.frozen_rows, parallel.frozen_rows);
8565 assert_eq!(single.bytes_freed, parallel.bytes_freed);
8566 assert_eq!(single.segment_bytes, parallel.segment_bytes);
8567 for id in 0..10i64 {
8569 assert_eq!(
8570 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8571 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8572 "PK {id} differs after single vs slice freeze"
8573 );
8574 }
8575 }
8576
8577 #[test]
8582 fn commit_freeze_slices_two_slices_match_single_slice() {
8583 let mut a = Catalog::new();
8584 let mut b = Catalog::new();
8585 for cat in [&mut a, &mut b] {
8586 cat.create_table(bigint_pk_users_schema()).unwrap();
8587 let t = cat.get_mut("users").unwrap();
8588 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
8591 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
8592 .unwrap();
8593 }
8594 t.add_index("by_id".into(), "id").unwrap();
8595 }
8596 let single = a
8597 .prepare_freeze_slice("users", "by_id", 0..8)
8598 .expect("prepare");
8599 let one = a
8600 .commit_freeze_slices("users", "by_id", alloc::vec![single])
8601 .expect("commit one");
8602 let s1 = b
8603 .prepare_freeze_slice("users", "by_id", 0..4)
8604 .expect("prepare s1");
8605 let s2 = b
8606 .prepare_freeze_slice("users", "by_id", 4..8)
8607 .expect("prepare s2");
8608 let two = b
8609 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
8610 .expect("commit two");
8611 assert_eq!(one.segment_bytes, two.segment_bytes);
8612 assert_eq!(one.frozen_rows, two.frozen_rows);
8613 for id in 0..10i64 {
8616 assert_eq!(
8617 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8618 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8619 "PK {id} differs after one-slice vs two-slice freeze"
8620 );
8621 }
8622 }
8623
8624 #[test]
8626 fn commit_freeze_slices_rejects_gap() {
8627 let mut cat = Catalog::new();
8628 cat.create_table(bigint_pk_users_schema()).unwrap();
8629 let t = cat.get_mut("users").unwrap();
8630 for id in 0..6i64 {
8631 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8632 .unwrap();
8633 }
8634 t.add_index("by_id".into(), "id").unwrap();
8635 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
8636 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
8637 assert!(matches!(
8638 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
8639 Err(StorageError::Corrupt(_))
8640 ));
8641 assert_eq!(cat.cold_segment_count(), 0);
8643 assert_eq!(cat.get("users").unwrap().row_count(), 6);
8644 }
8645
8646 #[test]
8648 fn commit_freeze_slices_empty_is_noop() {
8649 let mut cat = Catalog::new();
8650 cat.create_table(bigint_pk_users_schema()).unwrap();
8651 let t = cat.get_mut("users").unwrap();
8652 for id in 0..3i64 {
8653 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8654 .unwrap();
8655 }
8656 t.add_index("by_id".into(), "id").unwrap();
8657 let report = cat
8658 .commit_freeze_slices("users", "by_id", Vec::new())
8659 .unwrap();
8660 assert_eq!(report.frozen_rows, 0);
8661 assert_eq!(cat.cold_segment_count(), 0);
8662 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8663 }
8664
8665 #[test]
8672 fn compact_merges_small_segments_storage_unit() {
8673 let mut cat = Catalog::new();
8674 cat.create_table(bigint_pk_users_schema()).unwrap();
8675 let t = cat.get_mut("users").unwrap();
8676 for id in 0..8i64 {
8677 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8678 .unwrap();
8679 }
8680 t.add_index("by_id".into(), "id").unwrap();
8681 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8683 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8684 assert_eq!(cat.cold_segment_count(), 2);
8685 assert_eq!(cat.cold_segment_slot_count(), 2);
8686
8687 let max_seg_bytes = cat
8690 .cold_segment_ids_global()
8691 .iter()
8692 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8693 .max()
8694 .unwrap();
8695 let target = max_seg_bytes + 1;
8696
8697 let report = cat
8698 .compact_cold_segments("users", "by_id", target)
8699 .expect("compact succeeds");
8700 assert_eq!(report.sources.len(), 2);
8701 let merged_id = report.merged_segment_id.expect("merge happened");
8702 assert_eq!(report.merged_rows, 6);
8703 assert_eq!(report.deleted_rows_pruned, 0);
8704 assert!(!report.merged_segment_bytes.is_empty());
8705
8706 assert_eq!(cat.cold_segment_count(), 1);
8709 assert_eq!(cat.cold_segment_slot_count(), 3);
8710 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
8711
8712 for id in 0..8i64 {
8715 let got = cat
8716 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8717 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
8718 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8719 }
8720 }
8721
8722 #[test]
8726 fn compact_drops_shadowed_cold_rows() {
8727 let mut cat = Catalog::new();
8728 cat.create_table(bigint_pk_users_schema()).unwrap();
8729 let t = cat.get_mut("users").unwrap();
8730 for id in 0..6i64 {
8731 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8732 .unwrap();
8733 }
8734 t.add_index("by_id".into(), "id").unwrap();
8735 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8736 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8737 assert_eq!(
8739 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8740 .unwrap(),
8741 1
8742 );
8743 assert_eq!(
8744 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
8745 .unwrap(),
8746 1
8747 );
8748
8749 let max_seg_bytes = cat
8750 .cold_segment_ids_global()
8751 .iter()
8752 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8753 .max()
8754 .unwrap();
8755 let report = cat
8756 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8757 .expect("compact succeeds");
8758 assert_eq!(report.sources.len(), 2);
8759 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
8760 assert_eq!(report.deleted_rows_pruned, 2);
8761
8762 for shadowed in [1i64, 4i64] {
8764 assert!(
8765 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
8766 .is_none(),
8767 "shadowed PK {shadowed} must remain invisible after compact"
8768 );
8769 }
8770 for live in [0i64, 2, 3, 5] {
8772 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
8773 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
8774 }
8775 }
8776
8777 #[test]
8780 fn compact_is_noop_below_two_candidates() {
8781 let mut cat = Catalog::new();
8782 cat.create_table(bigint_pk_users_schema()).unwrap();
8783 let t = cat.get_mut("users").unwrap();
8784 for id in 0..6i64 {
8785 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8786 .unwrap();
8787 }
8788 t.add_index("by_id".into(), "id").unwrap();
8789 let report = cat
8791 .compact_cold_segments("users", "by_id", 1 << 30)
8792 .expect("noop ok");
8793 assert!(report.merged_segment_id.is_none());
8794 assert!(report.sources.is_empty());
8795
8796 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8798 let report = cat
8799 .compact_cold_segments("users", "by_id", 1 << 30)
8800 .expect("noop ok");
8801 assert!(report.merged_segment_id.is_none());
8802 assert_eq!(cat.cold_segment_count(), 1);
8803
8804 let report = cat
8807 .compact_cold_segments("users", "by_id", 1)
8808 .expect("noop ok");
8809 assert!(report.merged_segment_id.is_none());
8810 assert_eq!(cat.cold_segment_count(), 1);
8811 }
8812
8813 #[test]
8821 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
8822 let mut cat = Catalog::new();
8823 cat.create_table(bigint_pk_users_schema()).unwrap();
8824 let t = cat.get_mut("users").unwrap();
8825 for id in 0..6i64 {
8826 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8827 .unwrap();
8828 }
8829 t.add_index("by_id".into(), "id").unwrap();
8830 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8831 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8832 let max_seg_bytes = cat
8833 .cold_segment_ids_global()
8834 .iter()
8835 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8836 .max()
8837 .unwrap();
8838 let report = cat
8839 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8840 .expect("compact ok");
8841 let merged_id = report.merged_segment_id.unwrap();
8842
8843 let cat_bytes = cat.serialize();
8848 let merged_bytes = report.merged_segment_bytes.clone();
8849
8850 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
8851 restored
8852 .load_segment_bytes_at(merged_id, merged_bytes)
8853 .expect("reload merged ok");
8854
8855 for id in 0..6i64 {
8857 let got = restored
8858 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8859 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
8860 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8861 }
8862 assert_eq!(restored.cold_segment_count(), 1);
8865 }
8866
8867 #[test]
8870 fn load_segment_bytes_at_pads_and_rejects_collision() {
8871 let mut cat = Catalog::new();
8872 cat.create_table(bigint_pk_users_schema()).unwrap();
8873 let t = cat.get_mut("users").unwrap();
8874 for id in 0..4i64 {
8875 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8876 .unwrap();
8877 }
8878 t.add_index("by_id".into(), "id").unwrap();
8879 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
8880 let bytes_seg0 = report.segment_bytes.clone();
8881
8882 cat.load_segment_bytes_at(5, bytes_seg0.clone())
8886 .expect("pad + load ok");
8887 assert_eq!(cat.cold_segment_slot_count(), 6);
8888 assert_eq!(cat.cold_segment_count(), 2);
8889
8890 assert!(matches!(
8892 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
8893 Err(StorageError::Corrupt(_))
8894 ));
8895 assert!(matches!(
8897 cat.load_segment_bytes_at(0, bytes_seg0),
8898 Err(StorageError::Corrupt(_))
8899 ));
8900 }
8901
8902 #[test]
8906 fn promote_then_refreeze_does_not_leave_orphan_locators() {
8907 let mut cat = Catalog::new();
8908 cat.create_table(bigint_pk_users_schema()).unwrap();
8909 let t = cat.get_mut("users").unwrap();
8910 for id in 0..4i64 {
8911 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8912 .unwrap();
8913 }
8914 t.add_index("by_id".into(), "id").unwrap();
8915
8916 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
8918 let promoted = cat
8919 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
8920 .unwrap();
8921 assert!(promoted.is_some());
8922 let entries_after_promote = cat
8923 .get("users")
8924 .unwrap()
8925 .index_on(0)
8926 .unwrap()
8927 .lookup_eq(&IndexKey::Int(0))
8928 .to_vec();
8929 assert_eq!(entries_after_promote.len(), 1);
8930 assert!(entries_after_promote[0].is_hot());
8931
8932 for id in [2i64, 3] {
8939 assert_eq!(
8940 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8941 .unwrap(),
8942 make_user_row(id, &alloc::format!("u-{id}"))
8943 );
8944 }
8945 }
8946}