1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21pub mod trgm;
22
23pub use self::bloom::{BloomError, BloomFilter};
24pub use self::row_locator::{RowLocator, RowLocatorError};
25pub use self::segment::{
26 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
27 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
28 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
29 wrap_v2_envelope_with_brin,
30};
31
32use alloc::boxed::Box;
33use alloc::collections::{BTreeMap, BTreeSet};
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::sync::Arc;
37use alloc::vec::Vec;
38use core::fmt;
39
40use self::persistent::PersistentVec;
41use self::persistent_btree::PersistentBTreeMap;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
55pub enum VecEncoding {
56 #[default]
57 F32,
58 Sq8,
59 F16,
60}
61
62impl fmt::Display for VecEncoding {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 match self {
65 Self::F32 => f.write_str("F32"),
66 Self::Sq8 => f.write_str("SQ8"),
67 Self::F16 => f.write_str("HALF"),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum DataType {
77 SmallInt,
80 Int, BigInt, Float, Text,
84 Varchar(u32),
87 Char(u32),
91 Bool,
92 Vector {
98 dim: u32,
99 encoding: VecEncoding,
100 },
101 Numeric {
107 precision: u8,
108 scale: u8,
109 },
110 Date,
113 Timestamp,
116 Timestamptz,
124 Interval,
129 Json,
134 Jsonb,
140 Bytes,
148 TextArray,
157 IntArray,
161 BigIntArray,
164 TsVector,
172 TsQuery,
176}
177
178impl fmt::Display for DataType {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 match self {
181 Self::SmallInt => f.write_str("SMALLINT"),
182 Self::Int => f.write_str("INT"),
183 Self::BigInt => f.write_str("BIGINT"),
184 Self::Float => f.write_str("FLOAT"),
185 Self::Text => f.write_str("TEXT"),
186 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
187 Self::Char(n) => write!(f, "CHAR({n})"),
188 Self::Bool => f.write_str("BOOL"),
189 Self::Vector { dim, encoding } => match encoding {
190 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
191 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
192 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
193 },
194 Self::Numeric { precision, scale } => {
195 if *scale == 0 {
196 write!(f, "NUMERIC({precision})")
197 } else {
198 write!(f, "NUMERIC({precision}, {scale})")
199 }
200 }
201 Self::Date => f.write_str("DATE"),
202 Self::Timestamp => f.write_str("TIMESTAMP"),
203 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
204 Self::Interval => f.write_str("INTERVAL"),
205 Self::Json => f.write_str("JSON"),
206 Self::Jsonb => f.write_str("JSONB"),
207 Self::Bytes => f.write_str("BYTEA"),
208 Self::TextArray => f.write_str("TEXT[]"),
209 Self::IntArray => f.write_str("INT[]"),
210 Self::BigIntArray => f.write_str("BIGINT[]"),
211 Self::TsVector => f.write_str("TSVECTOR"),
212 Self::TsQuery => f.write_str("TSQUERY"),
213 }
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct TsLexeme {
224 pub word: String,
225 pub positions: Vec<u16>,
226 pub weight: u8,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum TsQueryAst {
234 Term {
238 word: String,
239 weight_mask: u8,
240 },
241 And(Box<TsQueryAst>, Box<TsQueryAst>),
242 Or(Box<TsQueryAst>, Box<TsQueryAst>),
243 Not(Box<TsQueryAst>),
244 Phrase {
247 left: Box<TsQueryAst>,
248 right: Box<TsQueryAst>,
249 distance: u16,
250 },
251}
252
253#[derive(Debug, Clone, PartialEq)]
257#[non_exhaustive]
258pub enum Value {
259 SmallInt(i16),
260 Int(i32),
261 BigInt(i64),
262 Float(f64),
263 Text(String),
264 Bool(bool),
265 Vector(Vec<f32>),
266 Sq8Vector(crate::quantize::Sq8Vector),
273 HalfVector(crate::halfvec::HalfVector),
279 Numeric {
283 scaled: i128,
284 scale: u8,
285 },
286 Date(i32),
288 Timestamp(i64),
290 Interval {
293 months: i32,
294 micros: i64,
295 },
296 Json(String),
300 Bytes(Vec<u8>),
306 TextArray(Vec<Option<String>>),
312 IntArray(Vec<Option<i32>>),
316 BigIntArray(Vec<Option<i64>>),
319 TsVector(Vec<TsLexeme>),
324 TsQuery(TsQueryAst),
327 Null,
328}
329
330impl Value {
331 pub fn data_type(&self) -> Option<DataType> {
333 match self {
334 Self::SmallInt(_) => Some(DataType::SmallInt),
335 Self::Int(_) => Some(DataType::Int),
336 Self::BigInt(_) => Some(DataType::BigInt),
337 Self::Float(_) => Some(DataType::Float),
338 Self::Text(_) => Some(DataType::Text),
341 Self::Bool(_) => Some(DataType::Bool),
342 Self::Vector(v) => Some(DataType::Vector {
343 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
344 encoding: VecEncoding::F32,
345 }),
346 Self::Sq8Vector(q) => Some(DataType::Vector {
347 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
348 encoding: VecEncoding::Sq8,
349 }),
350 Self::HalfVector(h) => Some(DataType::Vector {
351 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
352 encoding: VecEncoding::F16,
353 }),
354 Self::Numeric { scale, .. } => Some(DataType::Numeric {
359 precision: 0,
360 scale: *scale,
361 }),
362 Self::Date(_) => Some(DataType::Date),
363 Self::Timestamp(_) => Some(DataType::Timestamp),
364 Self::Interval { .. } => Some(DataType::Interval),
365 Self::Json(_) => Some(DataType::Json),
366 Self::Bytes(_) => Some(DataType::Bytes),
367 Self::TextArray(_) => Some(DataType::TextArray),
368 Self::IntArray(_) => Some(DataType::IntArray),
369 Self::BigIntArray(_) => Some(DataType::BigIntArray),
370 Self::TsVector(_) => Some(DataType::TsVector),
371 Self::TsQuery(_) => Some(DataType::TsQuery),
372 Self::Null => None,
373 }
374 }
375
376 pub const fn is_null(&self) -> bool {
377 matches!(self, Self::Null)
378 }
379}
380
381#[derive(Debug, Clone, PartialEq)]
384pub struct Row {
385 pub values: Vec<Value>,
386}
387
388impl Row {
389 pub const fn new(values: Vec<Value>) -> Self {
390 Self { values }
391 }
392
393 pub fn len(&self) -> usize {
394 self.values.len()
395 }
396
397 pub fn is_empty(&self) -> bool {
398 self.values.is_empty()
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
403pub struct ColumnSchema {
404 pub name: String,
405 pub ty: DataType,
406 pub nullable: bool,
407 pub default: Option<Value>,
412 pub runtime_default: Option<String>,
420 pub auto_increment: bool,
424}
425
426#[derive(Debug, Clone, PartialEq)]
427pub struct TableSchema {
428 pub name: String,
429 pub columns: Vec<ColumnSchema>,
430 pub hot_tier_bytes: Option<u64>,
436 pub foreign_keys: Vec<ForeignKeyConstraint>,
443 pub uniqueness_constraints: Vec<UniquenessConstraint>,
450 pub checks: Vec<String>,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct UniquenessConstraint {
467 pub is_primary_key: bool,
472 pub columns: Vec<usize>,
476 pub nulls_not_distinct: bool,
483}
484
485#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct ForeignKeyConstraint {
491 pub name: Option<String>,
495 pub local_columns: Vec<usize>,
498 pub parent_table: String,
500 pub parent_columns: Vec<usize>,
505 pub on_delete: FkAction,
507 pub on_update: FkAction,
510}
511
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum FkAction {
515 Restrict,
516 Cascade,
517 SetNull,
518 SetDefault,
519 NoAction,
520}
521
522impl FkAction {
523 pub const fn tag(self) -> u8 {
525 match self {
526 Self::Restrict => 0,
527 Self::Cascade => 1,
528 Self::SetNull => 2,
529 Self::SetDefault => 3,
530 Self::NoAction => 4,
531 }
532 }
533 pub const fn from_tag(b: u8) -> Option<Self> {
534 Some(match b {
535 0 => Self::Restrict,
536 1 => Self::Cascade,
537 2 => Self::SetNull,
538 3 => Self::SetDefault,
539 4 => Self::NoAction,
540 _ => return None,
541 })
542 }
543}
544
545impl TableSchema {
546 pub fn column_position(&self, name: &str) -> Option<usize> {
547 self.columns.iter().position(|c| c.name == name)
548 }
549}
550
551#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
556pub enum IndexKey {
557 Int(i64),
558 Text(String),
559 Bool(bool),
560}
561
562impl IndexKey {
563 pub fn from_value(v: &Value) -> Option<Self> {
564 match v {
565 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
566 Value::Int(n) => Some(Self::Int(i64::from(*n))),
567 Value::BigInt(n) => Some(Self::Int(*n)),
568 Value::Text(s) => Some(Self::Text(s.clone())),
569 Value::Bool(b) => Some(Self::Bool(*b)),
570 Value::Date(d) => Some(Self::Int(i64::from(*d))),
573 Value::Timestamp(t) => Some(Self::Int(*t)),
574 Value::Null
579 | Value::Float(_)
580 | Value::Vector(_)
581 | Value::Sq8Vector(_)
582 | Value::HalfVector(_)
583 | Value::Numeric { .. }
584 | Value::Interval { .. }
585 | Value::Json(_)
586 | Value::Bytes(_)
587 | Value::TextArray(_)
588 | Value::IntArray(_)
589 | Value::BigIntArray(_)
590 | Value::TsVector(_)
591 | Value::TsQuery(_) => None,
592 }
593 }
594}
595
596#[derive(Debug, Clone)]
601pub struct Index {
602 pub name: String,
603 pub column_position: usize,
604 pub kind: IndexKind,
605 pub included_columns: Vec<usize>,
615 pub partial_predicate: Option<String>,
622 pub expression: Option<String>,
627 pub is_unique: bool,
634 pub extra_column_positions: Vec<usize>,
643}
644
645pub const NSW_DEFAULT_M: usize = 16;
648
649#[derive(Debug, Clone)]
657pub struct FreezeReport {
658 pub segment_id: u32,
661 pub frozen_rows: usize,
664 pub bytes_freed: u64,
668 pub segment_bytes: Vec<u8>,
673}
674
675#[derive(Debug, Clone)]
684pub struct FreezeSlice {
685 pub row_range: core::ops::Range<usize>,
690 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
696}
697
698#[derive(Debug, Clone)]
714pub struct CompactReport {
715 pub sources: Vec<u32>,
717 pub merged_segment_id: Option<u32>,
719 pub merged_segment_bytes: Vec<u8>,
721 pub merged_rows: usize,
723 pub deleted_rows_pruned: usize,
728 pub bytes_reclaimed_estimate: u64,
732}
733
734#[derive(Debug, Clone)]
735pub enum IndexKind {
736 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
753 Nsw(NswGraph),
755 Brin {
762 column_type: DataType,
766 },
767 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
783 GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
794}
795
796#[derive(Debug, Clone)]
805pub struct NswGraph {
806 pub m: usize,
808 pub m_max_0: usize,
811 pub entry: Option<usize>,
814 pub entry_level: u8,
816 pub levels: PersistentVec<u8>,
823 pub layers: Vec<PersistentVec<Vec<u32>>>,
839}
840
841impl NswGraph {
842 fn new(m: usize) -> Self {
843 Self {
844 m,
845 m_max_0: m.saturating_mul(2),
846 entry: None,
847 entry_level: 0,
848 levels: PersistentVec::new(),
849 layers: alloc::vec![PersistentVec::new()],
850 }
851 }
852
853 pub const fn cap_for_layer(&self, layer: u8) -> usize {
855 if layer == 0 { self.m_max_0 } else { self.m }
856 }
857}
858
859#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
866 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
869 x ^= x >> 30;
870 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
871 x ^= x >> 27;
872 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
873 x ^= x >> 31;
874 let mut level: u8 = 0;
879 while x & 0xF == 0 && level < MAX_LEVEL {
880 level += 1;
881 x >>= 4;
882 }
883 level
884}
885
886impl Index {
887 fn new_btree(name: String, column_position: usize) -> Self {
888 Self {
889 name,
890 column_position,
891 kind: IndexKind::BTree(PersistentBTreeMap::new()),
892 included_columns: Vec::new(),
893 partial_predicate: None,
894 expression: None,
895 is_unique: false,
896 extra_column_positions: Vec::new(),
897 }
898 }
899
900 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
901 Self {
902 name,
903 column_position,
904 kind: IndexKind::Nsw(NswGraph::new(m)),
905 included_columns: Vec::new(),
906 partial_predicate: None,
907 expression: None,
908 is_unique: false,
909 extra_column_positions: Vec::new(),
910 }
911 }
912
913 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
917 Self {
918 name,
919 column_position,
920 kind: IndexKind::Brin { column_type },
921 included_columns: Vec::new(),
922 partial_predicate: None,
923 expression: None,
924 is_unique: false,
925 extra_column_positions: Vec::new(),
926 }
927 }
928
929 fn new_gin(name: String, column_position: usize) -> Self {
934 Self {
935 name,
936 column_position,
937 kind: IndexKind::Gin(PersistentBTreeMap::new()),
938 included_columns: Vec::new(),
939 partial_predicate: None,
940 expression: None,
941 is_unique: false,
942 extra_column_positions: Vec::new(),
943 }
944 }
945
946 fn new_gin_trgm(name: String, column_position: usize) -> Self {
951 Self {
952 name,
953 column_position,
954 kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
955 included_columns: Vec::new(),
956 partial_predicate: None,
957 expression: None,
958 is_unique: false,
959 extra_column_positions: Vec::new(),
960 }
961 }
962
963 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
972 match &self.kind {
973 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
974 IndexKind::Nsw(_)
978 | IndexKind::Brin { .. }
979 | IndexKind::Gin(_)
980 | IndexKind::GinTrgm(_) => &[][..],
981 }
982 }
983
984 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
988 match &self.kind {
989 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
990 IndexKind::BTree(_)
991 | IndexKind::Nsw(_)
992 | IndexKind::Brin { .. }
993 | IndexKind::GinTrgm(_) => &[][..],
994 }
995 }
996
997 pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
1002 match &self.kind {
1003 IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
1004 IndexKind::BTree(_)
1005 | IndexKind::Nsw(_)
1006 | IndexKind::Brin { .. }
1007 | IndexKind::Gin(_) => &[][..],
1008 }
1009 }
1010
1011 pub const fn nsw(&self) -> Option<&NswGraph> {
1014 match &self.kind {
1015 IndexKind::Nsw(g) => Some(g),
1016 IndexKind::BTree(_)
1017 | IndexKind::Brin { .. }
1018 | IndexKind::Gin(_)
1019 | IndexKind::GinTrgm(_) => None,
1020 }
1021 }
1022
1023 pub const fn is_brin(&self) -> bool {
1028 matches!(self.kind, IndexKind::Brin { .. })
1029 }
1030
1031 pub const fn is_gin_trgm(&self) -> bool {
1035 matches!(self.kind, IndexKind::GinTrgm(_))
1036 }
1037
1038 pub const fn is_gin(&self) -> bool {
1042 matches!(self.kind, IndexKind::Gin(_))
1043 }
1044}
1045
1046#[derive(Debug, Clone)]
1062pub struct Table {
1063 schema: TableSchema,
1064 rows: PersistentVec<Row>,
1065 indices: Vec<Index>,
1066 hot_bytes: u64,
1067 cold_row_count: u64,
1081 cold_row_count_stale: bool,
1086}
1087
1088impl Table {
1089 pub fn new(schema: TableSchema) -> Self {
1090 Self {
1091 schema,
1092 rows: PersistentVec::new(),
1093 indices: Vec::new(),
1094 hot_bytes: 0,
1095 cold_row_count: 0,
1096 cold_row_count_stale: false,
1097 }
1098 }
1099
1100 #[must_use]
1104 pub const fn hot_bytes(&self) -> u64 {
1105 self.hot_bytes
1106 }
1107
1108 #[must_use]
1111 pub const fn cold_row_count(&self) -> u64 {
1112 self.cold_row_count
1113 }
1114
1115 pub fn set_cold_row_count(&mut self, n: u64) {
1118 self.cold_row_count = n;
1119 self.cold_row_count_stale = false;
1120 }
1121
1122 pub fn mark_cold_row_count_stale(&mut self) {
1127 self.cold_row_count_stale = true;
1128 }
1129
1130 #[must_use]
1134 pub const fn cold_row_count_stale(&self) -> bool {
1135 self.cold_row_count_stale
1136 }
1137
1138 #[must_use]
1149 pub fn count_cold_locators(&self) -> u64 {
1150 let mut best: u64 = 0;
1151 for idx in &self.indices {
1152 if let IndexKind::BTree(map) = &idx.kind {
1153 let n: u64 = map
1154 .iter()
1155 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1156 .sum();
1157 if n > best {
1158 best = n;
1159 }
1160 }
1161 }
1162 best
1163 }
1164
1165 pub const fn schema(&self) -> &TableSchema {
1166 &self.schema
1167 }
1168
1169 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1173 &mut self.schema
1174 }
1175
1176 pub const fn rows(&self) -> &PersistentVec<Row> {
1180 &self.rows
1181 }
1182
1183 pub const fn row_count(&self) -> usize {
1184 self.rows.len()
1185 }
1186
1187 pub fn indices_mut(&mut self) -> &mut [Index] {
1192 &mut self.indices
1193 }
1194
1195 pub fn indices(&self) -> &[Index] {
1196 &self.indices
1197 }
1198
1199 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1205 let ty = self.schema.columns.get(col_pos)?.ty;
1206 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1207 return None;
1208 }
1209 let mut max: Option<i64> = None;
1210 for row in &self.rows {
1211 match row.values.get(col_pos) {
1212 Some(Value::SmallInt(n)) => {
1213 let v = i64::from(*n);
1214 max = Some(max.map_or(v, |m| m.max(v)));
1215 }
1216 Some(Value::Int(n)) => {
1217 let v = i64::from(*n);
1218 max = Some(max.map_or(v, |m| m.max(v)));
1219 }
1220 Some(Value::BigInt(n)) => {
1221 max = Some(max.map_or(*n, |m| m.max(*n)));
1222 }
1223 _ => {}
1224 }
1225 }
1226 Some(max.map_or(1, |m| m + 1))
1227 }
1228
1229 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1233 self.indices
1240 .iter()
1241 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1242 .or_else(|| {
1243 self.indices.iter().find(|i| {
1244 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1245 })
1246 })
1247 }
1248
1249 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1253 if row.len() != self.schema.columns.len() {
1254 return Err(StorageError::ArityMismatch {
1255 expected: self.schema.columns.len(),
1256 actual: row.len(),
1257 });
1258 }
1259 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1260 if val.is_null() {
1261 if !col.nullable {
1262 return Err(StorageError::NullInNotNull {
1263 column: col.name.clone(),
1264 });
1265 }
1266 continue;
1267 }
1268 let actual = val.data_type().expect("non-null");
1269 let compatible = actual == col.ty
1283 || matches!(
1284 (actual, col.ty),
1285 (
1286 DataType::Text,
1287 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1288 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1289 | (DataType::Json, DataType::Jsonb)
1290 | (DataType::Jsonb, DataType::Json)
1291 | (DataType::Timestamp, DataType::Timestamptz)
1292 | (DataType::Timestamptz, DataType::Timestamp)
1293 )
1294 || matches!(
1295 (actual, col.ty),
1296 (
1297 DataType::Numeric { scale: a, .. },
1298 DataType::Numeric { scale: b, .. },
1299 ) if a == b
1300 );
1301 if !compatible {
1302 return Err(StorageError::TypeMismatch {
1303 column: col.name.clone(),
1304 expected: col.ty,
1305 actual,
1306 position: i,
1307 });
1308 }
1309 }
1310 let new_row_idx = self.rows.len();
1311 for idx in &mut self.indices {
1315 match &mut idx.kind {
1316 IndexKind::BTree(map) => {
1317 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1318 let mut entries = map.get(&key).cloned().unwrap_or_default();
1324 entries.push(RowLocator::Hot(new_row_idx));
1325 map.insert_mut(key, entries);
1326 }
1327 }
1328 IndexKind::Gin(map) => {
1329 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1333 for lex in lexemes {
1334 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1335 entries.push(RowLocator::Hot(new_row_idx));
1336 map.insert_mut(lex.word.clone(), entries);
1337 }
1338 }
1339 }
1340 IndexKind::GinTrgm(map) => {
1341 if let Value::Text(s) = &row.values[idx.column_position] {
1345 for tri in trgm::extract_trigrams(s) {
1346 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1347 entries.push(RowLocator::Hot(new_row_idx));
1348 map.insert_mut(tri, entries);
1349 }
1350 }
1351 }
1352 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1356 }
1357 }
1358 self.hot_bytes = self
1361 .hot_bytes
1362 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1363 self.rows.push_mut(row);
1368 let new_row_idx = self.rows.len() - 1;
1371 let nsw_targets: Vec<usize> = self
1372 .indices
1373 .iter()
1374 .enumerate()
1375 .filter_map(|(i, idx)| {
1376 if matches!(idx.kind, IndexKind::Nsw(_)) {
1377 Some(i)
1378 } else {
1379 None
1380 }
1381 })
1382 .collect();
1383 for idx_pos in nsw_targets {
1384 nsw_insert_at(self, idx_pos, new_row_idx);
1385 }
1386 Ok(())
1387 }
1388
1389 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1393 if self.indices.iter().any(|i| i.name == name) {
1394 return Err(StorageError::DuplicateIndex { name });
1395 }
1396 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1397 StorageError::ColumnNotFound {
1398 column: column_name.into(),
1399 }
1400 })?;
1401 let mut idx = Index::new_btree(name, column_position);
1402 if let IndexKind::BTree(map) = &mut idx.kind {
1403 for (i, row) in self.rows.iter().enumerate() {
1404 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1405 let mut entries = map.get(&key).cloned().unwrap_or_default();
1406 entries.push(RowLocator::Hot(i));
1407 map.insert_mut(key, entries);
1408 }
1409 }
1410 }
1411 self.indices.push(idx);
1412 Ok(())
1413 }
1414
1415 pub fn add_nsw_index(
1420 &mut self,
1421 name: String,
1422 column_name: &str,
1423 m: usize,
1424 ) -> Result<(), StorageError> {
1425 self.add_nsw_index_inner(name, column_name, m, None)
1426 }
1427
1428 pub fn rebuild_nsw_index(
1440 &mut self,
1441 name: &str,
1442 new_encoding: Option<VecEncoding>,
1443 ) -> Result<(), StorageError> {
1444 let idx_pos = self
1445 .indices
1446 .iter()
1447 .position(|i| i.name == name)
1448 .ok_or_else(|| StorageError::IndexNotFound {
1449 name: String::from(name),
1450 })?;
1451 let col_pos = self.indices[idx_pos].column_position;
1452 let m = match &self.indices[idx_pos].kind {
1453 IndexKind::Nsw(g) => g.m,
1454 IndexKind::BTree(_)
1455 | IndexKind::Brin { .. }
1456 | IndexKind::Gin(_)
1457 | IndexKind::GinTrgm(_) => {
1458 return Err(StorageError::Unsupported(format!(
1459 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1460 )));
1461 }
1462 };
1463 let col_name = self.schema.columns[col_pos].name.clone();
1464 if let Some(target) = new_encoding {
1467 let current = match self.schema.columns[col_pos].ty {
1468 DataType::Vector { encoding, .. } => encoding,
1469 ref other => {
1470 return Err(StorageError::Unsupported(format!(
1471 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1472 )));
1473 }
1474 };
1475 if target != current {
1476 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1477 unreachable!("checked above")
1478 };
1479 let n = self.rows.len();
1480 for i in 0..n {
1481 let row = self
1482 .rows
1483 .get_mut(i)
1484 .expect("row index in bounds (we iterated up to len())");
1485 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1486 let recoded = recode_vector_cell(cell, target)?;
1487 row.values[col_pos] = recoded;
1488 }
1489 self.schema.columns[col_pos].ty = DataType::Vector {
1490 dim,
1491 encoding: target,
1492 };
1493 }
1494 }
1495 self.indices.remove(idx_pos);
1497 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1498 Ok(())
1499 }
1500
1501 pub fn restore_nsw_index(
1506 &mut self,
1507 name: String,
1508 column_name: &str,
1509 graph: NswGraph,
1510 ) -> Result<(), StorageError> {
1511 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1512 }
1513
1514 pub fn restore_btree_index(
1521 &mut self,
1522 name: String,
1523 column_name: &str,
1524 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1525 ) -> Result<(), StorageError> {
1526 if self.indices.iter().any(|i| i.name == name) {
1527 return Err(StorageError::DuplicateIndex { name });
1528 }
1529 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1530 StorageError::ColumnNotFound {
1531 column: column_name.into(),
1532 }
1533 })?;
1534 self.indices.push(Index {
1535 name,
1536 column_position,
1537 kind: IndexKind::BTree(map),
1538 included_columns: Vec::new(),
1539 partial_predicate: None,
1540 expression: None,
1541 is_unique: false,
1542 extra_column_positions: Vec::new(),
1543 });
1544 Ok(())
1545 }
1546
1547 pub fn restore_brin_index(
1552 &mut self,
1553 name: String,
1554 column_name: &str,
1555 column_type: DataType,
1556 ) -> Result<(), StorageError> {
1557 if self.indices.iter().any(|i| i.name == name) {
1558 return Err(StorageError::DuplicateIndex { name });
1559 }
1560 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1561 StorageError::ColumnNotFound {
1562 column: column_name.into(),
1563 }
1564 })?;
1565 self.indices
1566 .push(Index::new_brin(name, column_position, column_type));
1567 Ok(())
1568 }
1569
1570 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1574 if self.indices.iter().any(|i| i.name == name) {
1575 return Err(StorageError::DuplicateIndex { name });
1576 }
1577 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1578 StorageError::ColumnNotFound {
1579 column: column_name.into(),
1580 }
1581 })?;
1582 let column_type = self.schema.columns[column_position].ty;
1583 self.indices
1584 .push(Index::new_brin(name, column_position, column_type));
1585 Ok(())
1586 }
1587
1588 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1593 if self.indices.iter().any(|i| i.name == name) {
1594 return Err(StorageError::DuplicateIndex { name });
1595 }
1596 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1597 StorageError::ColumnNotFound {
1598 column: column_name.into(),
1599 }
1600 })?;
1601 if self.schema.columns[column_position].ty != DataType::TsVector {
1602 return Err(StorageError::Corrupt(format!(
1603 "GIN index {name:?} requires a tsvector column; \
1604 {column_name:?} is {:?}",
1605 self.schema.columns[column_position].ty
1606 )));
1607 }
1608 let mut idx = Index::new_gin(name, column_position);
1609 if let IndexKind::Gin(map) = &mut idx.kind {
1610 for (i, row) in self.rows.iter().enumerate() {
1611 if let Value::TsVector(lexemes) = &row.values[column_position] {
1612 for lex in lexemes {
1613 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1614 entries.push(RowLocator::Hot(i));
1615 map.insert_mut(lex.word.clone(), entries);
1616 }
1617 }
1618 }
1619 }
1620 self.indices.push(idx);
1621 Ok(())
1622 }
1623
1624 pub fn restore_gin_index(
1629 &mut self,
1630 name: String,
1631 column_name: &str,
1632 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1633 ) -> Result<(), StorageError> {
1634 if self.indices.iter().any(|i| i.name == name) {
1635 return Err(StorageError::DuplicateIndex { name });
1636 }
1637 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1638 StorageError::ColumnNotFound {
1639 column: column_name.into(),
1640 }
1641 })?;
1642 let mut idx = Index::new_gin(name, column_position);
1643 idx.kind = IndexKind::Gin(map);
1644 self.indices.push(idx);
1645 Ok(())
1646 }
1647
1648 pub fn add_gin_trgm_index(
1653 &mut self,
1654 name: String,
1655 column_name: &str,
1656 ) -> Result<(), StorageError> {
1657 if self.indices.iter().any(|i| i.name == name) {
1658 return Err(StorageError::DuplicateIndex { name });
1659 }
1660 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1661 StorageError::ColumnNotFound {
1662 column: column_name.into(),
1663 }
1664 })?;
1665 if !matches!(
1666 self.schema.columns[column_position].ty,
1667 DataType::Text | DataType::Varchar(_)
1668 ) {
1669 return Err(StorageError::Corrupt(format!(
1670 "trigram-GIN index {name:?} requires a TEXT/VARCHAR column; \
1671 {column_name:?} is {:?}",
1672 self.schema.columns[column_position].ty
1673 )));
1674 }
1675 let mut idx = Index::new_gin_trgm(name, column_position);
1676 if let IndexKind::GinTrgm(map) = &mut idx.kind {
1677 for (i, row) in self.rows.iter().enumerate() {
1678 if let Value::Text(s) = &row.values[column_position] {
1679 for tri in trgm::extract_trigrams(s) {
1680 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1681 entries.push(RowLocator::Hot(i));
1682 map.insert_mut(tri, entries);
1683 }
1684 }
1685 }
1686 }
1687 self.indices.push(idx);
1688 Ok(())
1689 }
1690
1691 pub fn restore_gin_trgm_index(
1694 &mut self,
1695 name: String,
1696 column_name: &str,
1697 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1698 ) -> Result<(), StorageError> {
1699 if self.indices.iter().any(|i| i.name == name) {
1700 return Err(StorageError::DuplicateIndex { name });
1701 }
1702 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1703 StorageError::ColumnNotFound {
1704 column: column_name.into(),
1705 }
1706 })?;
1707 let mut idx = Index::new_gin_trgm(name, column_position);
1708 idx.kind = IndexKind::GinTrgm(map);
1709 self.indices.push(idx);
1710 Ok(())
1711 }
1712
1713 pub fn register_cold_locators<I>(
1730 &mut self,
1731 index_name: &str,
1732 locators: I,
1733 ) -> Result<usize, StorageError>
1734 where
1735 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1736 {
1737 let idx = self
1738 .indices
1739 .iter_mut()
1740 .find(|i| i.name == index_name)
1741 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1742 let map = match &mut idx.kind {
1743 IndexKind::BTree(map) => map,
1744 IndexKind::Nsw(_)
1745 | IndexKind::Brin { .. }
1746 | IndexKind::Gin(_)
1747 | IndexKind::GinTrgm(_) => {
1748 return Err(StorageError::Corrupt(format!(
1749 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1750 )));
1751 }
1752 };
1753 let mut count = 0usize;
1754 for (key, locator) in locators {
1755 let mut entries = map.get(&key).cloned().unwrap_or_default();
1756 entries.push(locator);
1757 map.insert_mut(key, entries);
1758 count += 1;
1759 }
1760 Ok(count)
1761 }
1762
1763 pub fn register_gin_cold_locators<I>(
1770 &mut self,
1771 index_name: &str,
1772 locators: I,
1773 ) -> Result<usize, StorageError>
1774 where
1775 I: IntoIterator<Item = (String, RowLocator)>,
1776 {
1777 let idx = self
1778 .indices
1779 .iter_mut()
1780 .find(|i| i.name == index_name)
1781 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1782 let map = match &mut idx.kind {
1783 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => map,
1784 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1785 return Err(StorageError::Corrupt(format!(
1786 "register_gin_cold_locators: index {index_name:?} is not GIN"
1787 )));
1788 }
1789 };
1790 let mut count = 0usize;
1791 for (word, locator) in locators {
1792 let mut entries = map.get(&word).cloned().unwrap_or_default();
1793 entries.push(locator);
1794 map.insert_mut(word, entries);
1795 count += 1;
1796 }
1797 Ok(count)
1798 }
1799
1800 pub fn remove_cold_locators_for_key(
1810 &mut self,
1811 index_name: &str,
1812 key: &IndexKey,
1813 ) -> Result<usize, StorageError> {
1814 let idx = self
1815 .indices
1816 .iter_mut()
1817 .find(|i| i.name == index_name)
1818 .ok_or_else(|| {
1819 StorageError::Corrupt(format!(
1820 "remove_cold_locators_for_key: index {index_name:?} not found"
1821 ))
1822 })?;
1823 let map = match &mut idx.kind {
1824 IndexKind::BTree(map) => map,
1825 IndexKind::Nsw(_)
1826 | IndexKind::Brin { .. }
1827 | IndexKind::Gin(_)
1828 | IndexKind::GinTrgm(_) => {
1829 return Err(StorageError::Corrupt(format!(
1830 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1831 cold locators apply only to BTree indices"
1832 )));
1833 }
1834 };
1835 let Some(entries) = map.get(key) else {
1836 return Ok(0);
1837 };
1838 let mut kept: Vec<RowLocator> =
1839 entries.iter().copied().filter(RowLocator::is_hot).collect();
1840 let removed = entries.len() - kept.len();
1841 if removed == 0 {
1842 return Ok(0);
1843 }
1844 kept.shrink_to_fit();
1845 map.insert_mut(key.clone(), kept);
1853 Ok(removed)
1854 }
1855
1856 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1863 self.schema.columns.push(col);
1864 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1865 for row in self.rows.iter() {
1866 let mut values = row.values.clone();
1867 values.push(fill_value.clone());
1868 new_rows.push_mut(Row::new(values));
1869 }
1870 self.rows = new_rows;
1871 }
1872
1873 pub fn set_partial_predicate(&mut self, idx: usize, pred: Option<String>) {
1880 debug_assert!(idx < self.indices.len());
1881 self.indices[idx].partial_predicate = pred;
1882 }
1883
1884 pub fn rename_column(&mut self, col_pos: usize, new_name: &str) {
1895 debug_assert!(col_pos < self.schema.columns.len());
1896 self.schema.columns[col_pos].name = new_name.to_string();
1897 }
1898
1899 pub fn drop_column(&mut self, col_pos: usize) {
1909 debug_assert!(col_pos < self.schema.columns.len());
1910 self.schema.columns.remove(col_pos);
1912 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1914 for row in self.rows.iter() {
1915 let mut values = row.values.clone();
1916 if col_pos < values.len() {
1917 values.remove(col_pos);
1918 }
1919 new_rows.push_mut(Row::new(values));
1920 }
1921 self.rows = new_rows;
1922 self.indices.retain(|idx| idx.column_position != col_pos);
1924 for idx in &mut self.indices {
1925 if idx.column_position > col_pos {
1926 idx.column_position -= 1;
1927 }
1928 for inc in &mut idx.included_columns {
1930 if *inc as usize > col_pos {
1931 *inc -= 1;
1932 }
1933 }
1934 }
1935 let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
1940 for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
1941 uc.columns.retain(|&c| c != col_pos);
1942 if uc.columns.is_empty() {
1943 continue;
1944 }
1945 for c in &mut uc.columns {
1946 if *c > col_pos {
1947 *c -= 1;
1948 }
1949 }
1950 surviving_ucs.push(uc);
1951 }
1952 self.schema.uniqueness_constraints = surviving_ucs;
1953 for fk in &mut self.schema.foreign_keys {
1956 for c in &mut fk.local_columns {
1957 if *c > col_pos {
1958 *c -= 1;
1959 }
1960 }
1961 }
1962 self.rebuild_indices();
1969 }
1970
1971 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1977 if positions.is_empty() {
1978 return 0;
1979 }
1980 let mut to_remove = alloc::vec![false; self.rows.len()];
1984 let mut removed = 0;
1985 for &p in positions {
1986 if p < to_remove.len() && !to_remove[p] {
1987 to_remove[p] = true;
1988 removed += 1;
1989 }
1990 }
1991 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1992 let mut removed_bytes: u64 = 0;
1993 for (i, row) in self.rows.iter().enumerate() {
1994 if to_remove[i] {
1995 removed_bytes =
1996 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1997 } else {
1998 new_rows.push_mut(row.clone());
1999 }
2000 }
2001 self.rows = new_rows;
2002 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
2003 self.rebuild_indices();
2004 removed
2005 }
2006
2007 pub fn update_row(
2013 &mut self,
2014 position: usize,
2015 new_values: Vec<Value>,
2016 ) -> Result<(), StorageError> {
2017 if position >= self.rows.len() {
2018 return Err(StorageError::Corrupt(alloc::format!(
2019 "update_row: position {position} out of bounds (rows={})",
2020 self.rows.len()
2021 )));
2022 }
2023 if new_values.len() != self.schema.columns.len() {
2024 return Err(StorageError::ArityMismatch {
2025 expected: self.schema.columns.len(),
2026 actual: new_values.len(),
2027 });
2028 }
2029 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
2033 if val.is_null() {
2034 if !col.nullable {
2035 return Err(StorageError::NullInNotNull {
2036 column: col.name.clone(),
2037 });
2038 }
2039 continue;
2040 }
2041 let actual = val.data_type().expect("non-null");
2042 let compatible = actual == col.ty
2043 || matches!(
2044 (actual, col.ty),
2045 (
2046 DataType::Text,
2047 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
2048 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
2049 | (DataType::Json, DataType::Jsonb)
2050 | (DataType::Jsonb, DataType::Json)
2051 | (DataType::Timestamp, DataType::Timestamptz)
2052 | (DataType::Timestamptz, DataType::Timestamp)
2053 )
2054 || matches!(
2055 (actual, col.ty),
2056 (
2057 DataType::Numeric { scale: a, .. },
2058 DataType::Numeric { scale: b, .. },
2059 ) if a == b
2060 );
2061 if !compatible {
2062 return Err(StorageError::TypeMismatch {
2063 column: col.name.clone(),
2064 expected: col.ty,
2065 actual,
2066 position: i,
2067 });
2068 }
2069 }
2070 let old_row = self
2071 .rows
2072 .get(position)
2073 .expect("position bounds-checked above");
2074 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
2075 let new_row = Row::new(new_values);
2076 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
2077 self.rows = self
2078 .rows
2079 .set(position, new_row)
2080 .expect("position bounds-checked above");
2081 self.hot_bytes = self
2082 .hot_bytes
2083 .saturating_sub(old_bytes)
2084 .saturating_add(new_bytes);
2085 self.rebuild_indices();
2086 Ok(())
2087 }
2088
2089 fn rebuild_indices(&mut self) {
2096 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
2105 .indices
2106 .iter()
2107 .filter_map(|idx| match &idx.kind {
2108 IndexKind::BTree(map) => {
2109 let cold: Vec<(IndexKey, RowLocator)> = map
2110 .iter()
2111 .flat_map(|(k, locs)| {
2112 locs.iter()
2113 .filter(|l| l.is_cold())
2114 .copied()
2115 .map(move |l| (k.clone(), l))
2116 })
2117 .collect();
2118 if cold.is_empty() {
2119 None
2120 } else {
2121 Some((idx.name.clone(), cold))
2122 }
2123 }
2124 IndexKind::Nsw(_)
2127 | IndexKind::Brin { .. }
2128 | IndexKind::Gin(_)
2129 | IndexKind::GinTrgm(_) => None,
2130 })
2131 .collect();
2132
2133 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
2141 .indices
2142 .iter()
2143 .filter_map(|idx| match &idx.kind {
2144 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => {
2145 let cold: Vec<(String, RowLocator)> = map
2146 .iter()
2147 .flat_map(|(w, locs)| {
2148 locs.iter()
2149 .filter(|l| l.is_cold())
2150 .copied()
2151 .map(move |l| (w.clone(), l))
2152 })
2153 .collect();
2154 if cold.is_empty() {
2155 None
2156 } else {
2157 Some((idx.name.clone(), cold))
2158 }
2159 }
2160 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
2161 })
2162 .collect();
2163
2164 #[derive(Clone)]
2169 enum RebuildKind {
2170 BTree,
2171 Nsw(usize),
2172 Brin(DataType),
2173 Gin,
2174 GinTrgm,
2175 }
2176 let descriptors: Vec<(String, usize, RebuildKind)> = self
2177 .indices
2178 .iter()
2179 .map(|idx| {
2180 let kind = match &idx.kind {
2181 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
2182 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
2183 IndexKind::BTree(_) => RebuildKind::BTree,
2184 IndexKind::Gin(_) => RebuildKind::Gin,
2185 IndexKind::GinTrgm(_) => RebuildKind::GinTrgm,
2186 };
2187 (idx.name.clone(), idx.column_position, kind)
2188 })
2189 .collect();
2190 self.indices.clear();
2191 for (name, column_position, rebuild_kind) in descriptors {
2192 match rebuild_kind {
2193 RebuildKind::Nsw(m) => {
2194 let idx = Index::new_nsw(name, column_position, m);
2195 self.indices.push(idx);
2196 let idx_pos = self.indices.len() - 1;
2197 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2198 for row_idx in row_indices {
2199 nsw_insert_at(self, idx_pos, row_idx);
2200 }
2201 }
2202 RebuildKind::Brin(column_type) => {
2203 self.indices
2206 .push(Index::new_brin(name, column_position, column_type));
2207 }
2208 RebuildKind::BTree => {
2209 let mut idx = Index::new_btree(name, column_position);
2210 if let IndexKind::BTree(map) = &mut idx.kind {
2211 for (i, row) in self.rows.iter().enumerate() {
2212 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
2213 let mut entries = map.get(&key).cloned().unwrap_or_default();
2214 entries.push(RowLocator::Hot(i));
2215 map.insert_mut(key, entries);
2216 }
2217 }
2218 }
2219 self.indices.push(idx);
2220 }
2221 RebuildKind::Gin => {
2222 let mut idx = Index::new_gin(name, column_position);
2223 if let IndexKind::Gin(map) = &mut idx.kind {
2224 for (i, row) in self.rows.iter().enumerate() {
2225 if let Value::TsVector(lexemes) = &row.values[column_position] {
2226 for lex in lexemes {
2227 let mut entries =
2228 map.get(&lex.word).cloned().unwrap_or_default();
2229 entries.push(RowLocator::Hot(i));
2230 map.insert_mut(lex.word.clone(), entries);
2231 }
2232 }
2233 }
2234 }
2235 self.indices.push(idx);
2236 }
2237 RebuildKind::GinTrgm => {
2238 let mut idx = Index::new_gin_trgm(name, column_position);
2239 if let IndexKind::GinTrgm(map) = &mut idx.kind {
2240 for (i, row) in self.rows.iter().enumerate() {
2241 if let Value::Text(s) = &row.values[column_position] {
2242 for tri in trgm::extract_trigrams(s) {
2243 let mut entries =
2244 map.get(&tri).cloned().unwrap_or_default();
2245 entries.push(RowLocator::Hot(i));
2246 map.insert_mut(tri, entries);
2247 }
2248 }
2249 }
2250 }
2251 self.indices.push(idx);
2252 }
2253 }
2254 }
2255
2256 for (idx_name, locators) in preserved_cold {
2261 let _ = self.register_cold_locators(&idx_name, locators);
2265 }
2266 for (idx_name, locators) in preserved_gin_cold {
2268 let _ = self.register_gin_cold_locators(&idx_name, locators);
2269 }
2270 }
2271
2272 fn add_nsw_index_inner(
2273 &mut self,
2274 name: String,
2275 column_name: &str,
2276 m: usize,
2277 restore: Option<NswGraph>,
2278 ) -> Result<(), StorageError> {
2279 if self.indices.iter().any(|i| i.name == name) {
2280 return Err(StorageError::DuplicateIndex { name });
2281 }
2282 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2283 StorageError::ColumnNotFound {
2284 column: column_name.into(),
2285 }
2286 })?;
2287 if !matches!(
2288 self.schema.columns[column_position].ty,
2289 DataType::Vector { .. }
2290 ) {
2291 return Err(StorageError::TypeMismatch {
2292 column: column_name.into(),
2293 expected: DataType::Vector {
2294 dim: 0,
2295 encoding: VecEncoding::F32,
2296 },
2297 actual: self.schema.columns[column_position].ty,
2298 position: column_position,
2299 });
2300 }
2301 if let Some(graph) = restore {
2302 self.indices.push(Index {
2303 name,
2304 column_position,
2305 kind: IndexKind::Nsw(graph),
2306 included_columns: Vec::new(),
2307 partial_predicate: None,
2308 expression: None,
2309 is_unique: false,
2310 extra_column_positions: Vec::new(),
2311 });
2312 return Ok(());
2313 }
2314 let idx = Index::new_nsw(name, column_position, m);
2315 self.indices.push(idx);
2316 let idx_pos = self.indices.len() - 1;
2317 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2320 for row_idx in row_indices {
2321 nsw_insert_at(self, idx_pos, row_idx);
2322 }
2323 Ok(())
2324 }
2325}
2326
2327fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2334 if matches!(cell, Value::Null) {
2335 return Ok(cell);
2336 }
2337 let as_f32: Vec<f32> = match &cell {
2339 Value::Vector(v) => v.clone(),
2340 Value::Sq8Vector(q) => quantize::dequantize(q),
2341 Value::HalfVector(h) => h.to_f32_vec(),
2342 other => {
2343 return Err(StorageError::Unsupported(format!(
2344 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2345 other.data_type()
2346 )));
2347 }
2348 };
2349 Ok(match target {
2354 VecEncoding::F32 => Value::Vector(as_f32),
2355 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2356 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2357 })
2358}
2359
2360fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2367 let col_pos = table.indices[idx_pos].column_position;
2368 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2369 Value::Vector(v) => Some(v.len()),
2370 Value::Sq8Vector(q) => Some(q.bytes.len()),
2371 Value::HalfVector(h) => Some(h.dim()),
2372 _ => None,
2373 };
2374 let Some(dim) = cell_dim else {
2375 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2378 return;
2379 };
2380 if dim == 0 {
2381 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2382 return;
2383 }
2384 let level = nsw_assign_level(new_row_idx);
2385 ensure_node_slot(table, idx_pos, new_row_idx, level);
2386 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2387 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2388 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
2389 unreachable!("nsw_insert_at on a non-NSW index")
2390 }
2391 };
2392 if entry.is_none() {
2394 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2395 g.entry = Some(new_row_idx);
2396 g.entry_level = level;
2397 *g.levels
2398 .get_mut(new_row_idx)
2399 .expect("levels slot padded by ensure_node_slot") = level;
2400 }
2401 return;
2402 }
2403 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2405 *g.levels
2406 .get_mut(new_row_idx)
2407 .expect("levels slot padded by ensure_node_slot") = level;
2408 }
2409 let query = match &table.rows[new_row_idx].values[col_pos] {
2410 Value::Vector(v) => v.clone(),
2411 Value::Sq8Vector(q) => quantize::dequantize(q),
2417 Value::HalfVector(h) => h.to_f32_vec(),
2420 _ => return,
2421 };
2422 let mut current = entry.expect("entry was Some above");
2425 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2426 if entry_level > level {
2427 for layer in (level + 1..=entry_level).rev() {
2428 (current, current_d) =
2429 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2430 }
2431 }
2432 let top = level.min(entry_level);
2436 let ef = (m * 2).max(8);
2437 for layer in (0..=top).rev() {
2438 let cap = if layer == 0 { m * 2 } else { m };
2439 let mut candidates = layer_beam_search(
2440 table,
2441 idx_pos,
2442 layer,
2443 current,
2444 current_d,
2445 &query,
2446 ef,
2447 NswMetric::L2,
2448 );
2449 candidates.retain(|&(_, n)| n != new_row_idx);
2450 if let Some(&(d, n)) = candidates.first() {
2453 current = n;
2454 current_d = d;
2455 }
2456 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2457 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2458 }
2459 if level > entry_level
2462 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2463 {
2464 g.entry = Some(new_row_idx);
2465 g.entry_level = level;
2466 }
2467}
2468
2469fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2473 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2474 unreachable!("ensure_node_slot on a BTree index");
2475 };
2476 while g.layers.len() <= level as usize {
2477 g.layers.push(PersistentVec::new());
2478 }
2479 while g.levels.len() <= new_row_idx {
2480 g.levels.push_mut(0);
2481 }
2482 for layer_vec in &mut g.layers {
2483 while layer_vec.len() <= new_row_idx {
2484 layer_vec.push_mut(Vec::new());
2485 }
2486 }
2487}
2488
2489fn greedy_layer_walk(
2495 table: &Table,
2496 idx_pos: usize,
2497 layer: u8,
2498 mut current: usize,
2499 mut current_d: f32,
2500 query: &[f32],
2501) -> (usize, f32) {
2502 let g = match &table.indices[idx_pos].kind {
2503 IndexKind::Nsw(g) => g,
2504 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
2505 return (current, current_d);
2506 }
2507 };
2508 let col_pos = table.indices[idx_pos].column_position;
2509 loop {
2510 let neighbours: &[u32] = g
2511 .layers
2512 .get(layer as usize)
2513 .and_then(|layer_v| layer_v.get(current))
2514 .map_or(&[][..], Vec::as_slice);
2515 let mut best = current;
2516 let mut best_d = current_d;
2517 for &n in neighbours {
2518 let n = n as usize;
2519 let d = vec_l2_sq(table, col_pos, n, query);
2520 if d < best_d {
2521 best = n;
2522 best_d = d;
2523 }
2524 }
2525 if best == current {
2526 return (current, current_d);
2527 }
2528 current = best;
2529 current_d = best_d;
2530 }
2531}
2532
2533#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2546 table: &Table,
2547 idx_pos: usize,
2548 layer: u8,
2549 entry_node: usize,
2550 entry_d: f32,
2551 query: &[f32],
2552 ef: usize,
2553 metric: NswMetric,
2554) -> Vec<(f32, usize)> {
2555 let g = match &table.indices[idx_pos].kind {
2556 IndexKind::Nsw(g) => g,
2557 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return Vec::new(),
2558 };
2559 let col_pos = table.indices[idx_pos].column_position;
2560 let d0 = if matches!(metric, NswMetric::L2) {
2561 entry_d
2562 } else {
2563 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2564 };
2565 let row_count = table.rows.len();
2566 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2567 if entry_node < row_count {
2568 visited[entry_node] = true;
2569 }
2570 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2573 alloc::collections::BinaryHeap::with_capacity(ef);
2574 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2575 alloc::collections::BinaryHeap::with_capacity(ef);
2576 candidates.push(NodeClosest {
2577 dist: d0,
2578 node: entry_node,
2579 });
2580 results.push(NodeFurthest {
2581 dist: d0,
2582 node: entry_node,
2583 });
2584 while let Some(cur) = candidates.pop() {
2585 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2586 if cur.dist > worst && results.len() >= ef {
2587 break;
2588 }
2589 let neighbours: &[u32] = g
2590 .layers
2591 .get(layer as usize)
2592 .and_then(|layer_v| layer_v.get(cur.node))
2593 .map_or(&[][..], Vec::as_slice);
2594 for &n in neighbours {
2595 let n = n as usize;
2596 if n >= row_count || visited[n] {
2597 continue;
2598 }
2599 visited[n] = true;
2600 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2604 if !dn.is_finite() {
2605 continue;
2606 }
2607 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2608 if results.len() < ef || dn < worst {
2609 results.push(NodeFurthest { dist: dn, node: n });
2610 if results.len() > ef {
2611 results.pop();
2612 }
2613 candidates.push(NodeClosest { dist: dn, node: n });
2614 }
2615 }
2616 }
2617 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2620 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2621 out
2622}
2623
2624#[derive(Debug, Clone, Copy)]
2628struct NodeClosest {
2629 dist: f32,
2630 node: usize,
2631}
2632impl PartialEq for NodeClosest {
2633 fn eq(&self, other: &Self) -> bool {
2634 self.dist == other.dist && self.node == other.node
2635 }
2636}
2637impl Eq for NodeClosest {}
2638impl PartialOrd for NodeClosest {
2639 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2640 Some(self.cmp(other))
2641 }
2642}
2643impl Ord for NodeClosest {
2644 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2645 other
2647 .dist
2648 .partial_cmp(&self.dist)
2649 .unwrap_or(core::cmp::Ordering::Equal)
2650 }
2651}
2652
2653#[derive(Debug, Clone, Copy)]
2656struct NodeFurthest {
2657 dist: f32,
2658 node: usize,
2659}
2660impl PartialEq for NodeFurthest {
2661 fn eq(&self, other: &Self) -> bool {
2662 self.dist == other.dist && self.node == other.node
2663 }
2664}
2665impl Eq for NodeFurthest {}
2666impl PartialOrd for NodeFurthest {
2667 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2668 Some(self.cmp(other))
2669 }
2670}
2671impl Ord for NodeFurthest {
2672 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2673 self.dist
2674 .partial_cmp(&other.dist)
2675 .unwrap_or(core::cmp::Ordering::Equal)
2676 }
2677}
2678
2679fn select_neighbours_heuristic(
2688 candidates: &[(f32, usize)],
2689 m: usize,
2690 table: &Table,
2691 col_pos: usize,
2692) -> Vec<usize> {
2693 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2694 for &(d_q, e) in candidates {
2695 if chosen.len() >= m {
2696 break;
2697 }
2698 if !matches!(
2703 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2704 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2705 ) {
2706 continue;
2707 }
2708 let mut covered = false;
2709 for &r in &chosen {
2710 if cell_l2_sq(table, col_pos, e, r) < d_q {
2714 covered = true;
2715 break;
2716 }
2717 }
2718 if !covered {
2719 chosen.push(e);
2720 }
2721 }
2722 chosen
2723}
2724
2725fn connect_at_layer(
2729 table: &mut Table,
2730 idx_pos: usize,
2731 layer: u8,
2732 new_row_idx: usize,
2733 peers: &[usize],
2734) {
2735 let col_pos = table.indices[idx_pos].column_position;
2736 let cap = match &table.indices[idx_pos].kind {
2737 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2738 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return,
2739 };
2740 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2745 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2746 let layer_v = &mut g.layers[layer as usize];
2747 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2748 *slot = peers
2749 .iter()
2750 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2751 .collect();
2752 }
2753 }
2754 for &peer in peers {
2755 if !matches!(
2759 &table.rows[peer].values[col_pos],
2760 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2761 ) {
2762 continue;
2763 }
2764 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2766 let layer_v = &mut g.layers[layer as usize];
2767 if let Some(slot) = layer_v.get_mut(peer)
2768 && !slot.contains(&new_row_u32)
2769 {
2770 slot.push(new_row_u32);
2771 }
2772 }
2773 let needs_trim = match &table.indices[idx_pos].kind {
2777 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2778 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => false,
2779 };
2780 if needs_trim {
2781 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2782 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2783 .iter()
2784 .map(|&n| n as usize)
2785 .collect(),
2786 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => continue,
2787 };
2788 let mut tagged: Vec<(f32, usize)> = current_peers
2793 .iter()
2794 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2795 .collect();
2796 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2797 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2798 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2799 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2800 {
2801 *slot = kept
2802 .into_iter()
2803 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2804 .collect();
2805 }
2806 }
2807 }
2808}
2809
2810fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2817 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2818 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2819 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2820 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2821 }
2822 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2826 halfvec::half_l2_distance_sq_asymmetric(h, query)
2827 }
2828 _ => f32::INFINITY,
2829 }
2830}
2831
2832fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2839 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2840 return f32::INFINITY;
2841 };
2842 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2843 return f32::INFINITY;
2844 };
2845 match (cell_a, cell_b) {
2846 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2847 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2848 quantize::sq8_l2_distance_sq(a, b)
2849 }
2850 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2855 halfvec::half_l2_distance_sq(a, b)
2856 }
2857 _ => f32::INFINITY,
2858 }
2859}
2860
2861fn cell_to_query_metric_distance(
2866 table: &Table,
2867 col_pos: usize,
2868 row: usize,
2869 query: &[f32],
2870 metric: NswMetric,
2871) -> f32 {
2872 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2873 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2874 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2875 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2876 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2877 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2878 },
2879 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2882 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2883 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2884 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2885 },
2886 _ => f32::INFINITY,
2887 }
2888}
2889
2890#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2896pub enum NswMetric {
2897 L2,
2900 InnerProduct,
2903 Cosine,
2906}
2907
2908fn nsw_search(
2914 table: &Table,
2915 idx_pos: usize,
2916 query: &[f32],
2917 k: usize,
2918 ef: usize,
2919 metric: NswMetric,
2920) -> Vec<(f32, usize)> {
2921 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2922 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2923 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return Vec::new(),
2924 };
2925 let Some(entry) = entry else {
2926 return Vec::new();
2927 };
2928 let col_pos = table.indices[idx_pos].column_position;
2929 let sq8 = matches!(
2936 table.schema.columns.get(col_pos).map(|c| c.ty),
2937 Some(DataType::Vector {
2938 encoding: VecEncoding::Sq8,
2939 ..
2940 })
2941 );
2942 let ef = if sq8 {
2943 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2944 } else {
2945 ef.max(k)
2946 };
2947 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2949 let mut current = entry;
2950 let mut current_d = entry_d;
2951 for layer in (1..=entry_level).rev() {
2952 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2953 }
2954 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2956 if sq8 {
2957 results = sq8_rerank(table, col_pos, &results, query, metric);
2958 }
2959 results.truncate(k);
2960 results
2961}
2962
2963fn sq8_rerank(
2970 table: &Table,
2971 col_pos: usize,
2972 candidates: &[(f32, usize)],
2973 query: &[f32],
2974 metric: NswMetric,
2975) -> Vec<(f32, usize)> {
2976 let mut out: Vec<(f32, usize)> = candidates
2977 .iter()
2978 .filter_map(|&(adc_d, row)| {
2979 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2980 let Value::Sq8Vector(q) = cell else {
2981 return Some((adc_d, row));
2985 };
2986 let deq = quantize::dequantize(q);
2987 if deq.len() != query.len() {
2988 return None;
2989 }
2990 Some((metric_distance(metric, &deq, query), row))
2991 })
2992 .collect();
2993 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2994 out
2995}
2996
2997const SQ8_RERANK_OVER_FETCH: usize = 3;
3001
3002fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
3003 match metric {
3004 NswMetric::L2 => l2_distance_sq(a, b),
3005 NswMetric::InnerProduct => -inner_product_f32(a, b),
3006 NswMetric::Cosine => {
3007 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
3008 if na == 0.0 || nb == 0.0 {
3009 return f32::INFINITY;
3010 }
3011 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
3014 1.0 - dot / denom
3015 }
3016 }
3017}
3018
3019#[doc(hidden)]
3028#[inline]
3029pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
3030 #[cfg(target_arch = "aarch64")]
3031 {
3032 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3033 return unsafe { inner_product_neon(a, b) };
3036 }
3037 }
3038 inner_product_scalar(a, b)
3039}
3040
3041fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
3042 let mut dot: f32 = 0.0;
3043 for (x, y) in a.iter().zip(b.iter()) {
3044 dot += x * y;
3045 }
3046 dot
3047}
3048
3049#[cfg(target_arch = "aarch64")]
3050#[target_feature(enable = "neon")]
3051#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
3053 use core::arch::aarch64::{
3054 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
3055 };
3056 unsafe {
3057 let zero: float32x4_t = vdupq_n_f32(0.0);
3060 let mut acc0 = zero;
3061 let mut acc1 = zero;
3062 let n = a.len();
3063 let mut i = 0usize;
3064 while i + 8 <= n {
3065 let av0 = vld1q_f32(a.as_ptr().add(i));
3066 let bv0 = vld1q_f32(b.as_ptr().add(i));
3067 acc0 = vfmaq_f32(acc0, av0, bv0);
3068 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
3069 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
3070 acc1 = vfmaq_f32(acc1, av1, bv1);
3071 i += 8;
3072 }
3073 while i + 4 <= n {
3074 let av = vld1q_f32(a.as_ptr().add(i));
3075 let bv = vld1q_f32(b.as_ptr().add(i));
3076 acc0 = vfmaq_f32(acc0, av, bv);
3077 i += 4;
3078 }
3079 vaddvq_f32(vaddq_f32(acc0, acc1))
3080 }
3081}
3082
3083#[doc(hidden)]
3090#[inline]
3091pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3092 #[cfg(target_arch = "aarch64")]
3093 {
3094 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3095 return unsafe { cosine_dot_norms_neon(a, b) };
3097 }
3098 }
3099 cosine_dot_norms_scalar(a, b)
3100}
3101
3102fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3103 let mut dot: f32 = 0.0;
3104 let mut na: f32 = 0.0;
3105 let mut nb: f32 = 0.0;
3106 for (x, y) in a.iter().zip(b.iter()) {
3107 dot += x * y;
3108 na += x * x;
3109 nb += y * y;
3110 }
3111 (dot, na, nb)
3112}
3113
3114#[cfg(target_arch = "aarch64")]
3115#[target_feature(enable = "neon")]
3116#[allow(clippy::many_single_char_names, clippy::similar_names)]
3117unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3118 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
3119 unsafe {
3120 let zero: float32x4_t = vdupq_n_f32(0.0);
3121 let mut acc_dot = zero;
3122 let mut acc_na = zero;
3123 let mut acc_nb = zero;
3124 let n = a.len();
3125 let mut i = 0usize;
3126 while i + 4 <= n {
3127 let av = vld1q_f32(a.as_ptr().add(i));
3128 let bv = vld1q_f32(b.as_ptr().add(i));
3129 acc_dot = vfmaq_f32(acc_dot, av, bv);
3130 acc_na = vfmaq_f32(acc_na, av, av);
3131 acc_nb = vfmaq_f32(acc_nb, bv, bv);
3132 i += 4;
3133 }
3134 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
3135 }
3136}
3137
3138fn sqrt_newton_f32(x: f32) -> f32 {
3139 if x <= 0.0 {
3140 return 0.0;
3141 }
3142 let mut g = x;
3143 for _ in 0..10 {
3144 g = 0.5 * (g + x / g);
3145 }
3146 g
3147}
3148
3149#[inline]
3157fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
3158 #[cfg(target_arch = "aarch64")]
3159 {
3160 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3161 return unsafe { l2_distance_sq_neon(a, b) };
3165 }
3166 }
3167 l2_distance_sq_scalar(a, b)
3168}
3169
3170fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
3171 let mut sum: f32 = 0.0;
3172 for (x, y) in a.iter().zip(b.iter()) {
3173 let d = *x - *y;
3174 sum += d * d;
3175 }
3176 sum
3177}
3178
3179#[cfg(target_arch = "aarch64")]
3180#[target_feature(enable = "neon")]
3181#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
3183 use core::arch::aarch64::{
3184 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
3185 };
3186 unsafe {
3187 let zero: float32x4_t = vdupq_n_f32(0.0);
3192 let mut acc0 = zero;
3193 let mut acc1 = zero;
3194 let n = a.len();
3195 let mut i = 0usize;
3196 while i + 8 <= n {
3199 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3200 acc0 = vfmaq_f32(acc0, d0, d0);
3201 let d1 = vsubq_f32(
3202 vld1q_f32(a.as_ptr().add(i + 4)),
3203 vld1q_f32(b.as_ptr().add(i + 4)),
3204 );
3205 acc1 = vfmaq_f32(acc1, d1, d1);
3206 i += 8;
3207 }
3208 while i + 4 <= n {
3209 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3210 acc0 = vfmaq_f32(acc0, d, d);
3211 i += 4;
3212 }
3213 vaddvq_f32(vaddq_f32(acc0, acc1))
3214 }
3215}
3216
3217pub fn nsw_query(
3220 table: &Table,
3221 idx_name: &str,
3222 query: &[f32],
3223 k: usize,
3224 metric: NswMetric,
3225) -> Vec<usize> {
3226 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
3227 return Vec::new();
3228 };
3229 let ef = (k * 2).max(NSW_DEFAULT_M);
3230 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
3231 hits.truncate(k);
3232 hits.into_iter().map(|(_, idx)| idx).collect()
3233}
3234
3235pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
3239 table
3240 .indices
3241 .iter()
3242 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
3243}
3244
3245#[derive(Debug, Clone, Default)]
3257pub struct Catalog {
3258 tables: Vec<Table>,
3259 by_name: BTreeMap<String, usize>,
3262 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3284 functions: BTreeMap<String, FunctionDef>,
3291 triggers: Vec<TriggerDef>,
3296}
3297
3298#[derive(Debug, Clone, PartialEq, Eq)]
3304pub struct FunctionDef {
3305 pub name: String,
3306 pub args_repr: String,
3310 pub returns: String,
3315 pub language: String,
3317 pub body: String,
3322}
3323
3324#[derive(Debug, Clone, PartialEq, Eq)]
3328pub struct TriggerDef {
3329 pub name: String,
3330 pub table: String,
3332 pub timing: String,
3336 pub events: Vec<String>,
3339 pub for_each: String,
3343 pub function: String,
3345 pub update_columns: Vec<String>,
3352 pub enabled: bool,
3361}
3362
3363impl Catalog {
3364 pub const fn new() -> Self {
3365 Self {
3366 tables: Vec::new(),
3367 by_name: BTreeMap::new(),
3368 cold_segments: Vec::new(),
3369 functions: BTreeMap::new(),
3370 triggers: Vec::new(),
3371 }
3372 }
3373
3374 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3378 &self.functions
3379 }
3380
3381 pub fn create_function(
3385 &mut self,
3386 def: FunctionDef,
3387 or_replace: bool,
3388 ) -> Result<(), StorageError> {
3389 if !or_replace && self.functions.contains_key(&def.name) {
3390 return Err(StorageError::Corrupt(format!(
3391 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3392 def.name
3393 )));
3394 }
3395 self.functions.insert(def.name.clone(), def);
3396 Ok(())
3397 }
3398
3399 pub fn drop_function(&mut self, name: &str) -> bool {
3403 self.functions.remove(name).is_some()
3404 }
3405
3406 pub fn triggers(&self) -> &[TriggerDef] {
3410 &self.triggers
3411 }
3412
3413 pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
3418 &mut self.triggers
3419 }
3420
3421 pub fn create_trigger(
3427 &mut self,
3428 def: TriggerDef,
3429 or_replace: bool,
3430 ) -> Result<(), StorageError> {
3431 if !self.by_name.contains_key(&def.table) {
3432 return Err(StorageError::TableNotFound {
3433 name: def.table.clone(),
3434 });
3435 }
3436 if !self.functions.contains_key(&def.function) {
3437 return Err(StorageError::Corrupt(format!(
3438 "trigger {:?} references unknown function {:?}",
3439 def.name, def.function
3440 )));
3441 }
3442 let dup = self
3443 .triggers
3444 .iter()
3445 .position(|t| t.name == def.name && t.table == def.table);
3446 match (dup, or_replace) {
3447 (Some(_), false) => Err(StorageError::Corrupt(format!(
3448 "trigger {:?} already exists on table {:?}",
3449 def.name, def.table
3450 ))),
3451 (Some(i), true) => {
3452 self.triggers[i] = def;
3453 Ok(())
3454 }
3455 (None, _) => {
3456 self.triggers.push(def);
3457 Ok(())
3458 }
3459 }
3460 }
3461
3462 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3465 let before = self.triggers.len();
3466 self.triggers
3467 .retain(|t| !(t.name == name && t.table == table));
3468 before != self.triggers.len()
3469 }
3470
3471 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3472 if self.by_name.contains_key(&schema.name) {
3473 return Err(StorageError::DuplicateTable {
3474 name: schema.name.clone(),
3475 });
3476 }
3477 let idx = self.tables.len();
3478 let name = schema.name.clone();
3479 self.tables.push(Table::new(schema));
3480 self.by_name.insert(name, idx);
3481 Ok(())
3482 }
3483
3484 pub fn get(&self, name: &str) -> Option<&Table> {
3485 let idx = *self.by_name.get(name)?;
3486 self.tables.get(idx)
3487 }
3488
3489 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3490 let idx = *self.by_name.get(name)?;
3491 self.tables.get_mut(idx)
3492 }
3493
3494 pub fn table_count(&self) -> usize {
3495 self.tables.len()
3496 }
3497
3498 pub fn drop_table(&mut self, name: &str) -> bool {
3504 let Some(idx) = self.by_name.remove(name) else {
3505 return false;
3506 };
3507 self.tables.swap_remove(idx);
3510 if idx < self.tables.len() {
3512 let moved_name = self.tables[idx].schema.name.clone();
3513 self.by_name.insert(moved_name, idx);
3514 }
3515 true
3516 }
3517
3518 pub fn drop_named_index(&mut self, name: &str) -> bool {
3521 for t in &mut self.tables {
3522 let before = t.indices.len();
3523 t.indices.retain(|i| i.name != name);
3524 if t.indices.len() != before {
3525 return true;
3526 }
3527 }
3528 false
3529 }
3530
3531 pub fn table_names(&self) -> Vec<String> {
3534 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3535 }
3536
3537 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3548 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3549 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3550 })?;
3551 let seg = OwnedSegment::from_bytes(bytes)
3552 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3553 self.cold_segments.push(Some(Arc::new(seg)));
3554 Ok(id)
3555 }
3556
3557 pub fn load_segment_bytes_at(
3570 &mut self,
3571 target_id: u32,
3572 bytes: Vec<u8>,
3573 ) -> Result<(), StorageError> {
3574 let seg = OwnedSegment::from_bytes(bytes)
3575 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3576 let idx = target_id as usize;
3577 while self.cold_segments.len() <= idx {
3578 self.cold_segments.push(None);
3579 }
3580 if self.cold_segments[idx].is_some() {
3581 return Err(StorageError::Corrupt(format!(
3582 "load_segment_bytes_at: segment_id {target_id} already occupied"
3583 )));
3584 }
3585 self.cold_segments[idx] = Some(Arc::new(seg));
3586 Ok(())
3587 }
3588
3589 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3599 let idx = segment_id as usize;
3600 if idx >= self.cold_segments.len() {
3601 return Err(StorageError::Corrupt(format!(
3602 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3603 self.cold_segments.len()
3604 )));
3605 }
3606 self.cold_segments[idx] = None;
3607 Ok(())
3608 }
3609
3610 #[must_use]
3612 pub fn cold_segment_count(&self) -> usize {
3613 self.cold_segments.iter().filter(|s| s.is_some()).count()
3614 }
3615
3616 #[must_use]
3619 pub fn cold_segment_slot_count(&self) -> usize {
3620 self.cold_segments.len()
3621 }
3622
3623 #[must_use]
3628 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3629 self.cold_segments
3630 .iter()
3631 .enumerate()
3632 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3633 .collect()
3634 }
3635
3636 #[must_use]
3643 pub fn hot_tier_bytes(&self) -> u64 {
3644 self.tables
3645 .iter()
3646 .map(Table::hot_bytes)
3647 .fold(0u64, u64::saturating_add)
3648 }
3649
3650 pub fn freeze_oldest_to_cold(
3695 &mut self,
3696 table_name: &str,
3697 index_name: &str,
3698 max_rows: usize,
3699 ) -> Result<FreezeReport, StorageError> {
3700 if max_rows == 0 {
3702 return Err(StorageError::Corrupt(
3703 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3704 ));
3705 }
3706 let table = self.get(table_name).ok_or_else(|| {
3707 StorageError::Corrupt(format!(
3708 "freeze_oldest_to_cold: table {table_name:?} not found"
3709 ))
3710 })?;
3711 if max_rows > table.rows.len() {
3712 return Err(StorageError::Corrupt(format!(
3713 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3714 table.rows.len()
3715 )));
3716 }
3717 let idx = table
3718 .indices
3719 .iter()
3720 .find(|i| i.name == index_name)
3721 .ok_or_else(|| {
3722 StorageError::Corrupt(format!(
3723 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3724 ))
3725 })?;
3726 if !matches!(idx.kind, IndexKind::BTree(_)) {
3727 return Err(StorageError::Corrupt(format!(
3728 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3729 )));
3730 }
3731 let column_position = idx.column_position;
3732
3733 let schema = table.schema.clone();
3735 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3736 for row_idx in 0..max_rows {
3737 let row = table.rows.get(row_idx).expect("bounds-checked above");
3738 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3739 StorageError::Corrupt(format!(
3740 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3741 ))
3742 })?;
3743 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3744 StorageError::Corrupt(format!(
3745 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3746 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3747 ))
3748 })?;
3749 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3750 }
3751 to_freeze.sort_by_key(|(k, _, _)| *k);
3756 for w in to_freeze.windows(2) {
3760 if w[0].0 == w[1].0 {
3761 return Err(StorageError::Corrupt(format!(
3762 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3763 w[0].0
3764 )));
3765 }
3766 }
3767 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3771 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3775 .into_iter()
3776 .map(|(k, body, _)| (k, body))
3777 .collect();
3778 let frozen_rows = seg_rows.len();
3779 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3780 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3781
3782 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3791 let positions: Vec<usize> = (0..max_rows).collect();
3792 let t_mut = self
3793 .get_mut(table_name)
3794 .expect("just validated; still present");
3795 let removed = t_mut.delete_rows(&positions);
3796 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3797 let bytes_after = t_mut.hot_bytes();
3798 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3799
3800 let segment_id = self
3801 .load_segment_bytes(seg_bytes.clone())
3802 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3803 let new_cold = post_swap_keys.into_iter().map(|k| {
3804 (
3805 k,
3806 RowLocator::Cold {
3807 segment_id,
3808 page_offset: 0,
3809 },
3810 )
3811 });
3812 let t_mut = self.get_mut(table_name).expect("still present");
3813 t_mut.register_cold_locators(index_name, new_cold)?;
3814
3815 Ok(FreezeReport {
3816 segment_id,
3817 frozen_rows,
3818 bytes_freed,
3819 segment_bytes: seg_bytes,
3820 })
3821 }
3822
3823 #[must_use]
3829 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3830 self.cold_segments
3831 .get(segment_id as usize)
3832 .and_then(|s| s.as_deref())
3833 }
3834
3835 pub fn resolve_cold_locator(
3844 &self,
3845 table_name: &str,
3846 segment_id: u32,
3847 key: &IndexKey,
3848 ) -> Option<Row> {
3849 let t = self.get(table_name)?;
3850 let u64_key = index_key_as_u64(key)?;
3851 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3852 let payload = seg.lookup(u64_key)?;
3853 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3854 Some(row)
3855 }
3856
3857 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3875 let t = self.get(table)?;
3876 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3877 let locators = idx.lookup_eq(key);
3878 let cold_u64_key = index_key_as_u64(key);
3879 for loc in locators {
3880 match *loc {
3881 RowLocator::Hot(i) => {
3882 if let Some(row) = t.rows.get(i) {
3883 return Some(row.clone());
3884 }
3885 }
3886 RowLocator::Cold {
3887 segment_id,
3888 page_offset: _,
3889 } => {
3890 let Some(u64_key) = cold_u64_key else {
3891 continue;
3894 };
3895 let Some(seg) = self
3896 .cold_segments
3897 .get(segment_id as usize)
3898 .and_then(|s| s.as_deref())
3899 else {
3900 continue;
3911 };
3912 let Some(payload) = seg.lookup(u64_key) else {
3913 continue;
3914 };
3915 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3916 return Some(row);
3917 }
3918 }
3919 }
3920 None
3921 }
3922
3923 pub fn promote_cold_row(
3945 &mut self,
3946 table_name: &str,
3947 index_name: &str,
3948 key: &IndexKey,
3949 ) -> Result<Option<usize>, StorageError> {
3950 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3951 let Some((segment_id, _page_offset)) = cold_loc else {
3952 return Ok(None);
3953 };
3954 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3955 StorageError::Corrupt(
3956 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3957 .into(),
3958 )
3959 })?;
3960 let schema = self
3964 .get(table_name)
3965 .ok_or_else(|| {
3966 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3967 })?
3968 .schema
3969 .clone();
3970 let seg = self
3971 .cold_segments
3972 .get(segment_id as usize)
3973 .and_then(|s| s.as_ref())
3974 .ok_or_else(|| {
3975 StorageError::Corrupt(format!(
3976 "promote_cold_row: segment {segment_id} not registered on catalog"
3977 ))
3978 })?;
3979 let payload = seg.lookup(u64_key).ok_or_else(|| {
3980 StorageError::Corrupt(format!(
3981 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3982 but the segment's bloom/page lookup didn't return a row"
3983 ))
3984 })?;
3985 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3986 let t = self
3991 .get_mut(table_name)
3992 .expect("table existed at lookup time");
3993 t.insert(row)?;
3994 let new_hot_idx =
3995 t.rows.len().checked_sub(1).ok_or_else(|| {
3996 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3997 })?;
3998 t.remove_cold_locators_for_key(index_name, key)?;
4002 Ok(Some(new_hot_idx))
4003 }
4004
4005 pub fn shadow_cold_row(
4023 &mut self,
4024 table_name: &str,
4025 index_name: &str,
4026 key: &IndexKey,
4027 ) -> Result<usize, StorageError> {
4028 let t = self.get_mut(table_name).ok_or_else(|| {
4029 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
4030 })?;
4031 t.remove_cold_locators_for_key(index_name, key)
4032 }
4033
4034 pub fn prepare_freeze_slice(
4052 &self,
4053 table_name: &str,
4054 index_name: &str,
4055 row_range: core::ops::Range<usize>,
4056 ) -> Result<FreezeSlice, StorageError> {
4057 let table = self.get(table_name).ok_or_else(|| {
4058 StorageError::Corrupt(format!(
4059 "prepare_freeze_slice: table {table_name:?} not found"
4060 ))
4061 })?;
4062 let idx = table
4063 .indices
4064 .iter()
4065 .find(|i| i.name == index_name)
4066 .ok_or_else(|| {
4067 StorageError::Corrupt(format!(
4068 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
4069 ))
4070 })?;
4071 if !matches!(idx.kind, IndexKind::BTree(_)) {
4072 return Err(StorageError::Corrupt(format!(
4073 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
4074 )));
4075 }
4076 if row_range.end > table.rows.len() {
4077 return Err(StorageError::Corrupt(format!(
4078 "prepare_freeze_slice: row_range end {} > row_count {}",
4079 row_range.end,
4080 table.rows.len()
4081 )));
4082 }
4083 let column_position = idx.column_position;
4084 let schema = table.schema.clone();
4085 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
4086 for row_idx in row_range.clone() {
4087 let row = table.rows.get(row_idx).expect("bounds-checked above");
4088 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
4089 StorageError::Corrupt(format!(
4090 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
4091 ))
4092 })?;
4093 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
4094 StorageError::Corrupt(format!(
4095 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
4096 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4097 ))
4098 })?;
4099 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
4100 }
4101 rows.sort_by_key(|(k, _, _)| *k);
4102 Ok(FreezeSlice { row_range, rows })
4103 }
4104
4105 pub fn commit_freeze_slices(
4119 &mut self,
4120 table_name: &str,
4121 index_name: &str,
4122 slices: Vec<FreezeSlice>,
4123 ) -> Result<FreezeReport, StorageError> {
4124 let table = self.get(table_name).ok_or_else(|| {
4126 StorageError::Corrupt(format!(
4127 "commit_freeze_slices: table {table_name:?} not found"
4128 ))
4129 })?;
4130 let idx = table
4131 .indices
4132 .iter()
4133 .find(|i| i.name == index_name)
4134 .ok_or_else(|| {
4135 StorageError::Corrupt(format!(
4136 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
4137 ))
4138 })?;
4139 if !matches!(idx.kind, IndexKind::BTree(_)) {
4140 return Err(StorageError::Corrupt(format!(
4141 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
4142 )));
4143 }
4144 let mut ordered = slices;
4148 ordered.sort_by_key(|s| s.row_range.start);
4149 let mut expected_start = 0usize;
4153 for s in &ordered {
4154 if s.row_range.start != expected_start {
4155 return Err(StorageError::Corrupt(format!(
4156 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
4157 s.row_range.start, expected_start
4158 )));
4159 }
4160 expected_start = s.row_range.end;
4161 }
4162 let max_rows = expected_start;
4163 if max_rows > table.rows.len() {
4164 return Err(StorageError::Corrupt(format!(
4165 "commit_freeze_slices: total row range {} exceeds row_count {}",
4166 max_rows,
4167 table.rows.len()
4168 )));
4169 }
4170 if max_rows == 0 {
4171 return Ok(FreezeReport {
4172 segment_id: u32::MAX,
4173 frozen_rows: 0,
4174 bytes_freed: 0,
4175 segment_bytes: Vec::new(),
4176 });
4177 }
4178
4179 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
4184 if total_rows != max_rows {
4185 return Err(StorageError::Corrupt(format!(
4186 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
4187 )));
4188 }
4189 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
4190 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
4191 loop {
4192 let mut pick: Option<usize> = None;
4195 for (i, c) in cursors.iter().enumerate() {
4196 let slice = &ordered[i];
4197 if *c >= slice.rows.len() {
4198 continue;
4199 }
4200 match pick {
4201 None => pick = Some(i),
4202 Some(j) => {
4203 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
4204 pick = Some(i);
4205 }
4206 }
4207 }
4208 }
4209 let Some(i) = pick else { break };
4210 let row = ordered[i].rows[cursors[i]].clone();
4211 cursors[i] += 1;
4212 merged.push(row);
4213 }
4214 for w in merged.windows(2) {
4217 if w[0].0 == w[1].0 {
4218 return Err(StorageError::Corrupt(format!(
4219 "commit_freeze_slices: duplicate PK {} across slices",
4220 w[0].0
4221 )));
4222 }
4223 }
4224 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
4225 let seg_rows: Vec<(u64, Vec<u8>)> =
4226 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
4227 let frozen_rows = seg_rows.len();
4228 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4229 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
4230
4231 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
4233 let positions: Vec<usize> = (0..max_rows).collect();
4234 let t_mut = self
4235 .get_mut(table_name)
4236 .expect("just validated; still present");
4237 let removed = t_mut.delete_rows(&positions);
4238 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
4239 let bytes_after = t_mut.hot_bytes();
4240 let bytes_freed = bytes_before.saturating_sub(bytes_after);
4241
4242 let segment_id = self
4243 .load_segment_bytes(seg_bytes.clone())
4244 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
4245 let new_cold = post_swap_keys.into_iter().map(|k| {
4246 (
4247 k,
4248 RowLocator::Cold {
4249 segment_id,
4250 page_offset: 0,
4251 },
4252 )
4253 });
4254 let t_mut = self.get_mut(table_name).expect("still present");
4255 t_mut.register_cold_locators(index_name, new_cold)?;
4256
4257 Ok(FreezeReport {
4258 segment_id,
4259 frozen_rows,
4260 bytes_freed,
4261 segment_bytes: seg_bytes,
4262 })
4263 }
4264
4265 pub fn compact_cold_segments(
4308 &mut self,
4309 table_name: &str,
4310 index_name: &str,
4311 target_segment_bytes: u64,
4312 ) -> Result<CompactReport, StorageError> {
4313 let t = self.get(table_name).ok_or_else(|| {
4315 StorageError::Corrupt(format!(
4316 "compact_cold_segments: table {table_name:?} not found"
4317 ))
4318 })?;
4319 let idx = t
4320 .indices
4321 .iter()
4322 .find(|i| i.name == index_name)
4323 .ok_or_else(|| {
4324 StorageError::Corrupt(format!(
4325 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4326 ))
4327 })?;
4328 let map = match &idx.kind {
4329 IndexKind::BTree(m) => m,
4330 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
4331 return Err(StorageError::Corrupt(format!(
4332 "compact_cold_segments: index {index_name:?} is not BTree; \
4333 compaction applies only to BTree cold-tier indices"
4334 )));
4335 }
4336 };
4337
4338 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4341 for (_key, locators) in map.iter() {
4342 for loc in locators {
4343 if let RowLocator::Cold { segment_id, .. } = loc {
4344 referenced_ids.insert(*segment_id);
4345 }
4346 }
4347 }
4348 let candidate_set: BTreeSet<u32> = referenced_ids
4350 .into_iter()
4351 .filter(|id| {
4352 self.cold_segments
4353 .get(*id as usize)
4354 .and_then(|s| s.as_deref())
4355 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4356 })
4357 .collect();
4358 if candidate_set.len() < 2 {
4359 return Ok(CompactReport {
4360 sources: Vec::new(),
4361 merged_segment_id: None,
4362 merged_segment_bytes: Vec::new(),
4363 merged_rows: 0,
4364 deleted_rows_pruned: 0,
4365 bytes_reclaimed_estimate: 0,
4366 });
4367 }
4368 let mut source_row_count: usize = 0;
4370 let mut source_byte_total: u64 = 0;
4371 for &id in &candidate_set {
4372 let seg = self.cold_segments[id as usize]
4373 .as_ref()
4374 .expect("candidate selected only when slot is Some");
4375 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4376 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4377 }
4378 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4384 for (key, locators) in map.iter() {
4385 for loc in locators {
4386 let RowLocator::Cold { segment_id, .. } = loc else {
4387 continue;
4388 };
4389 if !candidate_set.contains(segment_id) {
4390 continue;
4391 }
4392 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4393 StorageError::Corrupt(format!(
4394 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4395 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4396 ))
4397 })?;
4398 let seg = self.cold_segments[*segment_id as usize]
4399 .as_ref()
4400 .expect("candidate slot guaranteed Some above");
4401 let payload = seg.lookup(u64_key).ok_or_else(|| {
4402 StorageError::Corrupt(format!(
4403 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4404 at segment {segment_id} but the segment lookup missed"
4405 ))
4406 })?;
4407 collected.insert(u64_key, (payload, key.clone()));
4408 break;
4409 }
4410 }
4411 let merged_rows = collected.len();
4412 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4413
4414 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4418 .iter()
4419 .map(|(k, (body, _))| (*k, body.clone()))
4420 .collect();
4421 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4422 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4423 let merged_bytes_len = seg_bytes.len() as u64;
4424
4425 let merged_segment_id = self
4427 .load_segment_bytes(seg_bytes.clone())
4428 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4429
4430 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4436 let t = self
4437 .get(table_name)
4438 .expect("table existed at the start of this fn");
4439 let idx = t
4440 .indices
4441 .iter()
4442 .find(|i| i.name == index_name)
4443 .expect("index existed at the start of this fn");
4444 let IndexKind::BTree(map) = &idx.kind else {
4445 unreachable!("validated above");
4446 };
4447 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4448 };
4449 let t_mut = self
4450 .get_mut(table_name)
4451 .expect("table existed at the start of this fn");
4452 let idx_mut = t_mut
4453 .indices
4454 .iter_mut()
4455 .find(|i| i.name == index_name)
4456 .expect("index existed at the start of this fn");
4457 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4458 unreachable!("validated above");
4459 };
4460 for (key, locators) in entries {
4461 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4462 let mut changed = false;
4463 for loc in &locators {
4464 match *loc {
4465 RowLocator::Cold {
4466 segment_id,
4467 page_offset: _,
4468 } if candidate_set.contains(&segment_id) => {
4469 let replacement = RowLocator::Cold {
4470 segment_id: merged_segment_id,
4471 page_offset: 0,
4472 };
4473 if !new_locs.contains(&replacement) {
4474 new_locs.push(replacement);
4475 }
4476 changed = true;
4477 }
4478 other => new_locs.push(other),
4479 }
4480 }
4481 if changed {
4482 map_mut.insert_mut(key, new_locs);
4483 }
4484 }
4485
4486 for &id in &candidate_set {
4491 self.tombstone_segment(id)?;
4492 }
4493
4494 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4495 Ok(CompactReport {
4496 sources: candidate_set.into_iter().collect(),
4497 merged_segment_id: Some(merged_segment_id),
4498 merged_segment_bytes: seg_bytes,
4499 merged_rows,
4500 deleted_rows_pruned,
4501 bytes_reclaimed_estimate,
4502 })
4503 }
4504
4505 fn find_cold_locator(
4511 &self,
4512 table_name: &str,
4513 index_name: &str,
4514 key: &IndexKey,
4515 ) -> Result<Option<(u32, u32)>, StorageError> {
4516 let t = self.get(table_name).ok_or_else(|| {
4517 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4518 })?;
4519 let idx = t
4520 .indices
4521 .iter()
4522 .find(|i| i.name == index_name)
4523 .ok_or_else(|| {
4524 StorageError::Corrupt(format!(
4525 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4526 ))
4527 })?;
4528 if !matches!(idx.kind, IndexKind::BTree(_)) {
4529 return Err(StorageError::Corrupt(format!(
4530 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4531 )));
4532 }
4533 for loc in idx.lookup_eq(key) {
4534 if let RowLocator::Cold {
4535 segment_id,
4536 page_offset,
4537 } = *loc
4538 {
4539 return Ok(Some((segment_id, page_offset)));
4540 }
4541 }
4542 Ok(None)
4543 }
4544}
4545
4546fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4552 match key {
4553 IndexKey::Int(n) => Some(n.cast_unsigned()),
4559 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4560 }
4561}
4562
4563#[derive(Debug, Clone, PartialEq, Eq)]
4564#[non_exhaustive]
4565pub enum StorageError {
4566 DuplicateTable {
4567 name: String,
4568 },
4569 TableNotFound {
4570 name: String,
4571 },
4572 ArityMismatch {
4573 expected: usize,
4574 actual: usize,
4575 },
4576 TypeMismatch {
4577 column: String,
4578 expected: DataType,
4579 actual: DataType,
4580 position: usize,
4581 },
4582 NullInNotNull {
4583 column: String,
4584 },
4585 DuplicateIndex {
4587 name: String,
4588 },
4589 ColumnNotFound {
4591 column: String,
4592 },
4593 Corrupt(String),
4596 IndexNotFound {
4599 name: String,
4600 },
4601 Unsupported(String),
4605}
4606
4607impl fmt::Display for StorageError {
4608 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4609 match self {
4610 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4611 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4612 Self::ArityMismatch { expected, actual } => write!(
4613 f,
4614 "row arity mismatch: expected {expected} columns, got {actual}"
4615 ),
4616 Self::TypeMismatch {
4617 column,
4618 expected,
4619 actual,
4620 position,
4621 } => write!(
4622 f,
4623 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4624 ),
4625 Self::NullInNotNull { column } => {
4626 write!(f, "NULL value in NOT NULL column {column:?}")
4627 }
4628 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4629 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4630 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4631 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4632 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4633 }
4634 }
4635}
4636
4637impl ColumnSchema {
4638 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4639 Self {
4640 name: name.into(),
4641 ty,
4642 nullable,
4643 default: None,
4644 runtime_default: None,
4645 auto_increment: false,
4646 }
4647 }
4648
4649 #[must_use]
4653 pub fn with_default(mut self, default: Value) -> Self {
4654 self.default = Some(default);
4655 self
4656 }
4657
4658 #[must_use]
4663 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4664 self.runtime_default = Some(expr.into());
4665 self
4666 }
4667
4668 #[must_use]
4670 pub const fn with_auto_increment(mut self) -> Self {
4671 self.auto_increment = true;
4672 self
4673 }
4674}
4675
4676impl TableSchema {
4677 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4678 Self {
4679 name: name.into(),
4680 columns,
4681 hot_tier_bytes: None,
4682 foreign_keys: Vec::new(),
4683 uniqueness_constraints: Vec::new(),
4684 checks: Vec::new(),
4685 }
4686 }
4687}
4688
4689const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4737const FILE_VERSION: u8 = 25;
4786const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4789
4790const INDEX_KEY_TAG_INT: u8 = 0;
4795const INDEX_KEY_TAG_TEXT: u8 = 1;
4796const INDEX_KEY_TAG_BOOL: u8 = 2;
4797
4798impl Catalog {
4799 pub fn serialize(&self) -> Vec<u8> {
4802 let mut out = Vec::with_capacity(64);
4803 out.extend_from_slice(FILE_MAGIC);
4804 out.push(FILE_VERSION);
4805 write_u32(
4806 &mut out,
4807 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4808 );
4809 for t in &self.tables {
4810 write_str(&mut out, &t.schema.name);
4811 write_u16(
4812 &mut out,
4813 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4814 );
4815 for c in &t.schema.columns {
4816 write_str(&mut out, &c.name);
4817 write_data_type(&mut out, c.ty);
4818 out.push(u8::from(c.nullable));
4819 match &c.default {
4820 None => out.push(0),
4821 Some(v) => {
4822 out.push(1);
4823 write_value(&mut out, v);
4824 }
4825 }
4826 out.push(u8::from(c.auto_increment));
4827 }
4828 write_u32(
4829 &mut out,
4830 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4831 );
4832 for row in &t.rows {
4837 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4838 }
4839 write_u16(
4846 &mut out,
4847 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4848 );
4849 for idx in &t.indices {
4850 write_str(&mut out, &idx.name);
4851 write_u16(
4852 &mut out,
4853 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4854 );
4855 match &idx.kind {
4856 IndexKind::BTree(map) => {
4857 out.push(0);
4858 write_u32(
4866 &mut out,
4867 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4868 );
4869 for (key, locators) in map {
4870 write_index_key(&mut out, key);
4871 write_u32(
4872 &mut out,
4873 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4874 );
4875 for loc in locators {
4876 loc.write_le(&mut out);
4877 }
4878 }
4879 }
4880 IndexKind::Nsw(g) => {
4881 out.push(1);
4882 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4883 write_nsw_graph(&mut out, g);
4884 }
4885 IndexKind::Brin { column_type } => {
4886 out.push(2);
4892 write_data_type(&mut out, *column_type);
4893 }
4894 IndexKind::Gin(map) => {
4895 out.push(3);
4904 write_u32(
4905 &mut out,
4906 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4907 );
4908 for (word, locators) in map {
4909 write_str(&mut out, word);
4910 write_u32(
4911 &mut out,
4912 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4913 );
4914 for loc in locators {
4915 loc.write_le(&mut out);
4916 }
4917 }
4918 }
4919 IndexKind::GinTrgm(map) => {
4920 out.push(4);
4930 write_u32(
4931 &mut out,
4932 u32::try_from(map.len())
4933 .expect("≤ 4G trigram-GIN posting lists"),
4934 );
4935 for (tri, locators) in map {
4936 write_str(&mut out, tri);
4937 write_u32(
4938 &mut out,
4939 u32::try_from(locators.len())
4940 .expect("≤ 4G locators/posting list"),
4941 );
4942 for loc in locators {
4943 loc.write_le(&mut out);
4944 }
4945 }
4946 }
4947 }
4948 write_u16(
4954 &mut out,
4955 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4956 );
4957 for col_pos in &idx.included_columns {
4958 write_u16(
4959 &mut out,
4960 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4961 );
4962 }
4963 match &idx.partial_predicate {
4967 None => out.push(0),
4968 Some(pred) => {
4969 out.push(1);
4970 write_str(&mut out, pred);
4971 }
4972 }
4973 match &idx.expression {
4976 None => out.push(0),
4977 Some(expr) => {
4978 out.push(1);
4979 write_str(&mut out, expr);
4980 }
4981 }
4982 out.push(u8::from(idx.is_unique));
4986 write_u16(
4989 &mut out,
4990 u16::try_from(idx.extra_column_positions.len())
4991 .expect("≤ 65k extra cols / index"),
4992 );
4993 for cp in &idx.extra_column_positions {
4994 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4995 }
4996 }
4997 match t.schema.hot_tier_bytes {
5003 None => out.push(0),
5004 Some(n) => {
5005 out.push(1);
5006 out.extend_from_slice(&n.to_le_bytes());
5007 }
5008 }
5009 write_u16(
5020 &mut out,
5021 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
5022 );
5023 for fk in &t.schema.foreign_keys {
5024 match &fk.name {
5025 None => out.push(0),
5026 Some(n) => {
5027 out.push(1);
5028 write_str(&mut out, n);
5029 }
5030 }
5031 write_u16(
5032 &mut out,
5033 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
5034 );
5035 for &p in &fk.local_columns {
5036 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5037 }
5038 write_str(&mut out, &fk.parent_table);
5039 write_u16(
5040 &mut out,
5041 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
5042 );
5043 for &p in &fk.parent_columns {
5044 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5045 }
5046 out.push(fk.on_delete.tag());
5047 out.push(fk.on_update.tag());
5048 }
5049 write_u16(
5058 &mut out,
5059 u16::try_from(t.schema.uniqueness_constraints.len())
5060 .expect("≤ 65k uniqueness constraints/table"),
5061 );
5062 for uc in &t.schema.uniqueness_constraints {
5063 out.push(u8::from(uc.is_primary_key));
5064 write_u16(
5065 &mut out,
5066 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
5067 );
5068 for &p in &uc.columns {
5069 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5070 }
5071 out.push(u8::from(uc.nulls_not_distinct));
5076 }
5077 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
5084 for (i, c) in t.schema.columns.iter().enumerate() {
5085 if let Some(e) = &c.runtime_default {
5086 rt_defaults.push((i, e.as_str()));
5087 }
5088 }
5089 write_u16(
5090 &mut out,
5091 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
5092 );
5093 for (pos, expr) in rt_defaults {
5094 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
5095 write_str(&mut out, expr);
5096 }
5097 write_u16(
5104 &mut out,
5105 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
5106 );
5107 for c in &t.schema.checks {
5108 write_str(&mut out, c.as_str());
5109 }
5110 }
5111 write_u32(
5124 &mut out,
5125 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
5126 );
5127 for fd in self.functions.values() {
5128 write_str(&mut out, &fd.name);
5129 write_str(&mut out, &fd.args_repr);
5130 write_str(&mut out, &fd.returns);
5131 write_str(&mut out, &fd.language);
5132 write_str_long(&mut out, &fd.body);
5133 }
5134 write_u32(
5135 &mut out,
5136 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
5137 );
5138 for td in &self.triggers {
5139 write_str(&mut out, &td.name);
5140 write_str(&mut out, &td.table);
5141 write_str(&mut out, &td.timing);
5142 write_u16(
5143 &mut out,
5144 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
5145 );
5146 for ev in &td.events {
5147 write_str(&mut out, ev);
5148 }
5149 write_str(&mut out, &td.for_each);
5150 write_str(&mut out, &td.function);
5151 write_u16(
5155 &mut out,
5156 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
5157 );
5158 for c in &td.update_columns {
5159 write_str(&mut out, c);
5160 }
5161 out.push(u8::from(td.enabled));
5163 }
5164 out
5165 }
5166
5167 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
5170 let mut cur = Cursor::new(buf);
5171 let magic = cur.take(8)?;
5172 if magic != FILE_MAGIC {
5173 return Err(StorageError::Corrupt(format!(
5174 "bad magic: expected SPGDB001, got {magic:?}"
5175 )));
5176 }
5177 let version = cur.read_u8()?;
5178 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
5179 return Err(StorageError::Corrupt(format!(
5180 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
5181 )));
5182 }
5183 let table_count = cur.read_u32()? as usize;
5184 let mut cat = Self::new();
5185 for _ in 0..table_count {
5186 deserialize_table(&mut cur, &mut cat, version)?;
5187 }
5188 if version >= 22 {
5192 let fn_count = cur.read_u32()? as usize;
5193 for _ in 0..fn_count {
5194 let name = cur.read_str()?;
5195 let args_repr = cur.read_str()?;
5196 let returns = cur.read_str()?;
5197 let language = cur.read_str()?;
5198 let body = cur.read_str_long()?;
5199 cat.functions.insert(
5200 name.clone(),
5201 FunctionDef {
5202 name,
5203 args_repr,
5204 returns,
5205 language,
5206 body,
5207 },
5208 );
5209 }
5210 let trg_count = cur.read_u32()? as usize;
5211 for _ in 0..trg_count {
5212 let name = cur.read_str()?;
5213 let table = cur.read_str()?;
5214 let timing = cur.read_str()?;
5215 let ev_count = cur.read_u16()? as usize;
5216 let mut events = Vec::with_capacity(ev_count);
5217 for _ in 0..ev_count {
5218 events.push(cur.read_str()?);
5219 }
5220 let for_each = cur.read_str()?;
5221 let function = cur.read_str()?;
5222 let update_columns = if version >= 23 {
5226 let n = cur.read_u16()? as usize;
5227 let mut cols = Vec::with_capacity(n);
5228 for _ in 0..n {
5229 cols.push(cur.read_str()?);
5230 }
5231 cols
5232 } else {
5233 Vec::new()
5234 };
5235 let enabled = if version >= 25 {
5239 cur.read_u8()? != 0
5240 } else {
5241 true
5242 };
5243 cat.triggers.push(TriggerDef {
5244 name,
5245 table,
5246 timing,
5247 events,
5248 for_each,
5249 function,
5250 update_columns,
5251 enabled,
5252 });
5253 }
5254 }
5255 if cur.pos < buf.len() {
5256 return Err(StorageError::Corrupt(format!(
5257 "trailing bytes: {} unread",
5258 buf.len() - cur.pos
5259 )));
5260 }
5261 Ok(cat)
5262 }
5263}
5264
5265fn deserialize_table(
5270 cur: &mut Cursor<'_>,
5271 cat: &mut Catalog,
5272 version: u8,
5273) -> Result<(), StorageError> {
5274 let table_name = cur.read_str()?;
5275 let name = table_name.clone();
5276 let col_count = cur.read_u16()? as usize;
5277 let mut cols = Vec::with_capacity(col_count);
5278 for _ in 0..col_count {
5279 let c_name = cur.read_str()?;
5280 let ty = cur.read_data_type()?;
5281 let nullable = cur.read_u8()? != 0;
5282 let default = match cur.read_u8()? {
5283 0 => None,
5284 1 => Some(cur.read_value()?),
5285 other => {
5286 return Err(StorageError::Corrupt(format!(
5287 "unknown default tag: {other}"
5288 )));
5289 }
5290 };
5291 let auto_increment = cur.read_u8()? != 0;
5292 cols.push(ColumnSchema {
5296 name: c_name,
5297 ty,
5298 nullable,
5299 default,
5300 runtime_default: None,
5301 auto_increment,
5302 });
5303 }
5304 let n_cols = cols.len();
5305 cat.create_table(TableSchema::new(name, cols))?;
5306 let t = cat.tables.last_mut().expect("create_table just pushed");
5310 deserialize_rows(cur, t, n_cols)?;
5311 deserialize_indices(cur, t, version)?;
5312 if version >= 11 {
5318 let has = cur.read_u8()?;
5319 let hot_tier_bytes = match has {
5320 0 => None,
5321 1 => Some(cur.read_u64()?),
5322 other => {
5323 return Err(StorageError::Corrupt(format!(
5324 "hot_tier_bytes appendix: unknown has-value byte {other}"
5325 )));
5326 }
5327 };
5328 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
5329 }
5330 if version >= 13 {
5333 let fk_count = cur.read_u16()? as usize;
5334 let mut fks = Vec::with_capacity(fk_count);
5335 for _ in 0..fk_count {
5336 let name = match cur.read_u8()? {
5337 0 => None,
5338 1 => Some(cur.read_str()?),
5339 other => {
5340 return Err(StorageError::Corrupt(format!(
5341 "FK appendix: unknown has-name byte {other}"
5342 )));
5343 }
5344 };
5345 let local_arity = cur.read_u16()? as usize;
5346 let mut local_columns = Vec::with_capacity(local_arity);
5347 for _ in 0..local_arity {
5348 local_columns.push(cur.read_u16()? as usize);
5349 }
5350 let parent_table = cur.read_str()?;
5351 let parent_arity = cur.read_u16()? as usize;
5352 if parent_arity != local_arity {
5353 return Err(StorageError::Corrupt(format!(
5354 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
5355 )));
5356 }
5357 let mut parent_columns = Vec::with_capacity(parent_arity);
5358 for _ in 0..parent_arity {
5359 parent_columns.push(cur.read_u16()? as usize);
5360 }
5361 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5362 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
5363 })?;
5364 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5365 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
5366 })?;
5367 fks.push(ForeignKeyConstraint {
5368 name,
5369 local_columns,
5370 parent_table,
5371 parent_columns,
5372 on_delete,
5373 on_update,
5374 });
5375 }
5376 t.schema_mut().foreign_keys = fks;
5377 }
5378 if version >= 15 {
5381 let uc_count = cur.read_u16()? as usize;
5382 let mut ucs = Vec::with_capacity(uc_count);
5383 for _ in 0..uc_count {
5384 let is_pk = cur.read_u8()? != 0;
5385 let arity = cur.read_u16()? as usize;
5386 let mut cols = Vec::with_capacity(arity);
5387 for _ in 0..arity {
5388 cols.push(cur.read_u16()? as usize);
5389 }
5390 let nulls_not_distinct = if version >= 23 {
5394 cur.read_u8()? != 0
5395 } else {
5396 false
5397 };
5398 ucs.push(UniquenessConstraint {
5399 is_primary_key: is_pk,
5400 columns: cols,
5401 nulls_not_distinct,
5402 });
5403 }
5404 t.schema_mut().uniqueness_constraints = ucs;
5405 let rt_count = cur.read_u16()? as usize;
5407 for _ in 0..rt_count {
5408 let pos = cur.read_u16()? as usize;
5409 let expr = cur.read_str()?;
5410 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5411 col.runtime_default = Some(expr);
5412 }
5413 }
5414 }
5415 if version >= 23 {
5418 let check_count = cur.read_u16()? as usize;
5419 let mut checks = Vec::with_capacity(check_count);
5420 for _ in 0..check_count {
5421 checks.push(cur.read_str()?);
5422 }
5423 t.schema_mut().checks = checks;
5424 }
5425 let _ = table_name;
5426 Ok(())
5427}
5428
5429fn deserialize_rows(
5430 cur: &mut Cursor<'_>,
5431 t: &mut Table,
5432 _n_cols: usize,
5433) -> Result<(), StorageError> {
5434 let row_count = cur.read_u32()? as usize;
5435 let mut hot_bytes: u64 = 0;
5440 for _ in 0..row_count {
5441 let tail = &cur.buf[cur.pos..];
5442 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5443 cur.pos += consumed;
5444 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5450 t.rows.push_mut(row);
5451 }
5452 t.hot_bytes = hot_bytes;
5453 Ok(())
5454}
5455
5456fn deserialize_indices(
5457 cur: &mut Cursor<'_>,
5458 t: &mut Table,
5459 version: u8,
5460) -> Result<(), StorageError> {
5461 let index_count = cur.read_u16()? as usize;
5462 for _ in 0..index_count {
5463 let idx_name = cur.read_str()?;
5464 let col_pos = cur.read_u16()? as usize;
5465 let column_name = t
5466 .schema
5467 .columns
5468 .get(col_pos)
5469 .ok_or_else(|| {
5470 StorageError::Corrupt(format!(
5471 "index {idx_name:?} points at non-existent column position {col_pos}"
5472 ))
5473 })?
5474 .name
5475 .clone();
5476 let kind_tag = cur.read_u8()?;
5477 match kind_tag {
5478 0 => {
5479 if version >= 9 {
5480 let map = read_btree_map(cur)?;
5485 t.restore_btree_index(idx_name, &column_name, map)?;
5486 } else {
5487 t.add_index(idx_name, &column_name)?;
5492 }
5493 }
5494 1 => {
5495 let m = cur.read_u16()? as usize;
5496 let graph = cur.read_nsw_graph(m)?;
5497 t.restore_nsw_index(idx_name, &column_name, graph)?;
5498 }
5499 2 => {
5500 let column_type = cur.read_data_type()?;
5504 t.restore_brin_index(idx_name, &column_name, column_type)?;
5505 }
5506 3 => {
5507 let map = read_gin_map(cur)?;
5512 t.restore_gin_index(idx_name, &column_name, map)?;
5513 }
5514 4 => {
5515 if version < 24 {
5519 return Err(StorageError::Corrupt(format!(
5520 "trigram-GIN index tag 4 found in catalog FILE_VERSION {version}; \
5521 FILE_VERSION 24+ required (v7.15.0 introduced this tag)"
5522 )));
5523 }
5524 let map = read_gin_map(cur)?;
5525 t.restore_gin_trgm_index(idx_name, &column_name, map)?;
5526 }
5527 other => {
5528 return Err(StorageError::Corrupt(format!(
5529 "unknown index kind tag: {other}"
5530 )));
5531 }
5532 }
5533 if version >= 12 {
5536 let num_included = cur.read_u16()? as usize;
5537 if num_included > 0 {
5538 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5539 for _ in 0..num_included {
5540 let cp = cur.read_u16()? as usize;
5541 if cp >= t.schema.columns.len() {
5542 return Err(StorageError::Corrupt(format!(
5543 "INCLUDE column position {cp} out of range \
5544 ({} schema columns)",
5545 t.schema.columns.len()
5546 )));
5547 }
5548 included.push(cp);
5549 }
5550 if let Some(last) = t.indices.last_mut() {
5551 last.included_columns = included;
5552 }
5553 }
5554 match cur.read_u8()? {
5556 0 => {}
5557 1 => {
5558 let pred = cur.read_str()?;
5559 if let Some(last) = t.indices.last_mut() {
5560 last.partial_predicate = Some(pred);
5561 }
5562 }
5563 other => {
5564 return Err(StorageError::Corrupt(format!(
5565 "partial_predicate tag: unknown byte {other}"
5566 )));
5567 }
5568 }
5569 match cur.read_u8()? {
5571 0 => {}
5572 1 => {
5573 let expr = cur.read_str()?;
5574 if let Some(last) = t.indices.last_mut() {
5575 last.expression = Some(expr);
5576 }
5577 }
5578 other => {
5579 return Err(StorageError::Corrupt(format!(
5580 "expression tag: unknown byte {other}"
5581 )));
5582 }
5583 }
5584 if version >= 16 {
5587 match cur.read_u8()? {
5588 0 => {}
5589 1 => {
5590 if let Some(last) = t.indices.last_mut() {
5591 last.is_unique = true;
5592 }
5593 }
5594 other => {
5595 return Err(StorageError::Corrupt(format!(
5596 "is_unique tag: unknown byte {other}"
5597 )));
5598 }
5599 }
5600 let n = cur.read_u16()? as usize;
5602 if n > 0 {
5603 let mut extras: Vec<usize> = Vec::with_capacity(n);
5604 for _ in 0..n {
5605 let cp = cur.read_u16()? as usize;
5606 if cp >= t.schema.columns.len() {
5607 return Err(StorageError::Corrupt(format!(
5608 "extra column position {cp} out of range \
5609 ({} schema columns)",
5610 t.schema.columns.len()
5611 )));
5612 }
5613 extras.push(cp);
5614 }
5615 if let Some(last) = t.indices.last_mut() {
5616 last.extra_column_positions = extras;
5617 }
5618 }
5619 }
5620 }
5621 }
5622 Ok(())
5623}
5624
5625fn read_btree_map(
5629 cur: &mut Cursor<'_>,
5630) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5631 let entry_count = cur.read_u32()? as usize;
5632 let mut map = PersistentBTreeMap::new();
5633 for _ in 0..entry_count {
5634 let key = cur.read_index_key()?;
5635 let locator_count = cur.read_u32()? as usize;
5636 let mut locators = Vec::with_capacity(locator_count);
5637 for _ in 0..locator_count {
5638 let tail = &cur.buf[cur.pos..];
5639 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5640 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5641 })?;
5642 cur.pos += consumed;
5643 locators.push(loc);
5644 }
5645 map.insert_mut(key, locators);
5646 }
5647 Ok(map)
5648}
5649
5650fn read_gin_map(
5654 cur: &mut Cursor<'_>,
5655) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5656 let entry_count = cur.read_u32()? as usize;
5657 let mut map = PersistentBTreeMap::new();
5658 for _ in 0..entry_count {
5659 let word = cur.read_str()?;
5660 let locator_count = cur.read_u32()? as usize;
5661 let mut locators = Vec::with_capacity(locator_count);
5662 for _ in 0..locator_count {
5663 let tail = &cur.buf[cur.pos..];
5664 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5665 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5666 })?;
5667 cur.pos += consumed;
5668 locators.push(loc);
5669 }
5670 map.insert_mut(word, locators);
5671 }
5672 Ok(map)
5673}
5674
5675fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5691 let entry = g.entry.map_or(u32::MAX, |e| {
5692 u32::try_from(e).expect("NSW entry fits in u32")
5693 });
5694 write_u16(
5695 out,
5696 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5697 );
5698 out.extend_from_slice(&entry.to_le_bytes());
5699 out.push(g.entry_level);
5700 let node_count = g.levels.len();
5701 write_u32(
5702 out,
5703 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5704 );
5705 for &lvl in &g.levels {
5706 out.push(lvl);
5707 }
5708 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5709 out.push(layer_count);
5710 for layer in &g.layers {
5711 write_u32(
5712 out,
5713 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5714 );
5715 for neighbors in layer {
5716 write_u16(
5717 out,
5718 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5719 );
5720 for &peer in neighbors {
5724 write_u32(out, peer);
5725 }
5726 }
5727 }
5728}
5729
5730fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5731 match t {
5732 DataType::Int => out.push(1),
5733 DataType::BigInt => out.push(2),
5734 DataType::Float => out.push(3),
5735 DataType::Text => out.push(4),
5736 DataType::Bool => out.push(5),
5737 DataType::Vector { dim, encoding } => match encoding {
5738 VecEncoding::F32 => {
5742 out.push(6);
5743 out.extend_from_slice(&dim.to_le_bytes());
5744 }
5745 VecEncoding::F16 => {
5748 out.push(15);
5749 out.extend_from_slice(&dim.to_le_bytes());
5750 }
5751 VecEncoding::Sq8 => {
5757 out.push(14);
5758 out.extend_from_slice(&dim.to_le_bytes());
5759 }
5760 },
5761 DataType::SmallInt => out.push(7),
5762 DataType::Varchar(max) => {
5763 out.push(8);
5764 out.extend_from_slice(&max.to_le_bytes());
5765 }
5766 DataType::Char(size) => {
5767 out.push(9);
5768 out.extend_from_slice(&size.to_le_bytes());
5769 }
5770 DataType::Numeric { precision, scale } => {
5771 out.push(10);
5772 out.push(precision);
5773 out.push(scale);
5774 }
5775 DataType::Date => out.push(11),
5776 DataType::Timestamp => out.push(12),
5777 DataType::Timestamptz => out.push(17),
5781 DataType::Interval => {
5786 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5787 }
5788 DataType::Json => out.push(13),
5789 DataType::Jsonb => out.push(16),
5792 DataType::Bytes => out.push(18),
5794 DataType::TextArray => out.push(19),
5797 DataType::IntArray => out.push(20),
5800 DataType::BigIntArray => out.push(21),
5803 DataType::TsVector => out.push(22),
5806 DataType::TsQuery => out.push(23),
5809 }
5810}
5811
5812impl Cursor<'_> {
5813 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5814 let tag = self.read_u8()?;
5815 match tag {
5816 1 => Ok(DataType::Int),
5817 2 => Ok(DataType::BigInt),
5818 3 => Ok(DataType::Float),
5819 4 => Ok(DataType::Text),
5820 5 => Ok(DataType::Bool),
5821 6 => Ok(DataType::Vector {
5822 dim: self.read_u32()?,
5823 encoding: VecEncoding::F32,
5824 }),
5825 7 => Ok(DataType::SmallInt),
5826 8 => Ok(DataType::Varchar(self.read_u32()?)),
5827 9 => Ok(DataType::Char(self.read_u32()?)),
5828 10 => {
5829 let precision = self.read_u8()?;
5830 let scale = self.read_u8()?;
5831 Ok(DataType::Numeric { precision, scale })
5832 }
5833 11 => Ok(DataType::Date),
5834 12 => Ok(DataType::Timestamp),
5835 13 => Ok(DataType::Json),
5836 14 => Ok(DataType::Vector {
5837 dim: self.read_u32()?,
5838 encoding: VecEncoding::Sq8,
5839 }),
5840 15 => Ok(DataType::Vector {
5844 dim: self.read_u32()?,
5845 encoding: VecEncoding::F16,
5846 }),
5847 16 => Ok(DataType::Jsonb),
5851 17 => Ok(DataType::Timestamptz),
5855 18 => Ok(DataType::Bytes),
5857 19 => Ok(DataType::TextArray),
5859 20 => Ok(DataType::IntArray),
5861 21 => Ok(DataType::BigIntArray),
5862 22 => Ok(DataType::TsVector),
5865 23 => Ok(DataType::TsQuery),
5866 other => Err(StorageError::Corrupt(format!(
5867 "unknown data type tag: {other}"
5868 ))),
5869 }
5870 }
5871}
5872
5873pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5879 debug_assert_eq!(
5880 row.values.len(),
5881 schema.columns.len(),
5882 "row_body_encoded_len: row arity must match schema"
5883 );
5884 let bitmap_bytes = schema.columns.len().div_ceil(8);
5885 let mut n = bitmap_bytes;
5886 for (col_idx, v) in row.values.iter().enumerate() {
5887 if matches!(v, Value::Null) {
5888 continue;
5889 }
5890 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5891 }
5892 n
5893}
5894
5895fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5901 match v {
5902 Value::SmallInt(_) => 2,
5903 Value::Int(_) | Value::Date(_) => 4,
5905 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5907 Value::Bool(_) => 1,
5908 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5910 Value::Vector(vec) => 4 + 4 * vec.len(),
5912 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5919 Value::HalfVector(h) => 4 + h.bytes.len(),
5922 Value::Numeric { .. } => 16 + 1,
5924 Value::Bytes(b) => 2 + b.len(),
5930 Value::TextArray(items) => {
5933 let mut n = 2; for item in items {
5935 n += 1; if let Some(s) = item {
5937 n += 2 + s.len();
5938 }
5939 }
5940 n
5941 }
5942 Value::IntArray(items) => {
5945 2 + items
5946 .iter()
5947 .map(|x| if x.is_some() { 5 } else { 1 })
5948 .sum::<usize>()
5949 }
5950 Value::BigIntArray(items) => {
5951 2 + items
5952 .iter()
5953 .map(|x| if x.is_some() { 9 } else { 1 })
5954 .sum::<usize>()
5955 }
5956 Value::TsVector(lexs) => {
5960 let mut n = 2;
5961 for l in lexs {
5962 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5963 }
5964 n
5965 }
5966 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5969 Value::Null => 0,
5971 Value::Interval { .. } => {
5973 unreachable!("Value::Interval has no on-disk encoding")
5974 }
5975 }
5976}
5977
5978pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5989 debug_assert_eq!(
5990 row.values.len(),
5991 schema.columns.len(),
5992 "dense encode: row arity must match schema"
5993 );
5994 let bitmap_bytes = schema.columns.len().div_ceil(8);
5995 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5998 let bitmap_offset = out.len();
5999 out.resize(bitmap_offset + bitmap_bytes, 0);
6000 for (i, v) in row.values.iter().enumerate() {
6001 if matches!(v, Value::Null) {
6002 out[bitmap_offset + i / 8] |= 1 << (i % 8);
6003 }
6004 }
6005 for (col_idx, v) in row.values.iter().enumerate() {
6006 if matches!(v, Value::Null) {
6007 continue;
6008 }
6009 write_value_body(&mut out, v, schema.columns[col_idx].ty);
6010 }
6011 out
6012}
6013
6014pub fn decode_row_body_dense(
6020 bytes: &[u8],
6021 schema: &TableSchema,
6022) -> Result<(Row, usize), StorageError> {
6023 let mut cur = Cursor::new(bytes);
6024 let bitmap_bytes = schema.columns.len().div_ceil(8);
6025 let mut bitmap_buf = [0u8; 32];
6026 if bitmap_bytes > bitmap_buf.len() {
6027 return Err(StorageError::Corrupt(format!(
6028 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
6029 )));
6030 }
6031 let slice = cur.take(bitmap_bytes)?;
6032 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
6033 let mut values = Vec::with_capacity(schema.columns.len());
6034 for (col_idx, col) in schema.columns.iter().enumerate() {
6035 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
6036 values.push(Value::Null);
6037 } else {
6038 values.push(cur.read_value_body(col.ty)?);
6039 }
6040 }
6041 Ok((Row { values }, cur.pos))
6042}
6043
6044fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
6053 match (v, ty) {
6054 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
6055 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
6056 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
6057 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
6058 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
6059 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
6060 write_str(out, s);
6061 }
6062 (
6063 Value::Vector(v),
6064 DataType::Vector {
6065 encoding: VecEncoding::F32,
6066 ..
6067 },
6068 ) => {
6069 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6070 out.extend_from_slice(&dim.to_le_bytes());
6071 for x in v {
6072 out.extend_from_slice(&x.to_le_bytes());
6073 }
6074 }
6075 (
6081 Value::Sq8Vector(q),
6082 DataType::Vector {
6083 encoding: VecEncoding::Sq8,
6084 ..
6085 },
6086 ) => {
6087 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6088 out.extend_from_slice(&dim.to_le_bytes());
6089 out.extend_from_slice(&q.min.to_le_bytes());
6090 out.extend_from_slice(&q.max.to_le_bytes());
6091 out.extend_from_slice(&q.bytes);
6092 }
6093 (
6097 Value::HalfVector(h),
6098 DataType::Vector {
6099 encoding: VecEncoding::F16,
6100 ..
6101 },
6102 ) => {
6103 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6104 out.extend_from_slice(&dim.to_le_bytes());
6105 out.extend_from_slice(&h.bytes);
6106 }
6107 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
6108 out.extend_from_slice(&scaled.to_le_bytes());
6109 out.push(scale);
6110 }
6111 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
6112 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
6113 out.extend_from_slice(&t.to_le_bytes())
6114 }
6115 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
6119 (Value::Bytes(b), DataType::Bytes) => {
6122 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
6123 out.extend_from_slice(&len.to_le_bytes());
6124 out.extend_from_slice(b);
6125 }
6126 (Value::TextArray(items), DataType::TextArray) => {
6129 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6130 out.extend_from_slice(&count.to_le_bytes());
6131 for item in items {
6132 match item {
6133 None => out.push(1),
6134 Some(s) => {
6135 out.push(0);
6136 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6137 out.extend_from_slice(&len.to_le_bytes());
6138 out.extend_from_slice(s.as_bytes());
6139 }
6140 }
6141 }
6142 }
6143 (Value::IntArray(items), DataType::IntArray) => {
6146 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6147 out.extend_from_slice(&count.to_le_bytes());
6148 for item in items {
6149 match item {
6150 None => out.push(1),
6151 Some(n) => {
6152 out.push(0);
6153 out.extend_from_slice(&n.to_le_bytes());
6154 }
6155 }
6156 }
6157 }
6158 (Value::BigIntArray(items), DataType::BigIntArray) => {
6161 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6162 out.extend_from_slice(&count.to_le_bytes());
6163 for item in items {
6164 match item {
6165 None => out.push(1),
6166 Some(n) => {
6167 out.push(0);
6168 out.extend_from_slice(&n.to_le_bytes());
6169 }
6170 }
6171 }
6172 }
6173 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
6176 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
6178 (other, ty) => unreachable!(
6182 "schema-driven encode received mismatched value/type pair: \
6183 value tag={:?}, column type={:?}",
6184 other.data_type(),
6185 ty
6186 ),
6187 }
6188}
6189
6190fn write_value(out: &mut Vec<u8>, v: &Value) {
6191 match v {
6192 Value::Null => out.push(0),
6193 Value::SmallInt(n) => {
6194 out.push(7);
6195 out.extend_from_slice(&n.to_le_bytes());
6196 }
6197 Value::Int(n) => {
6198 out.push(1);
6199 out.extend_from_slice(&n.to_le_bytes());
6200 }
6201 Value::BigInt(n) => {
6202 out.push(2);
6203 out.extend_from_slice(&n.to_le_bytes());
6204 }
6205 Value::Float(x) => {
6206 out.push(3);
6207 out.extend_from_slice(&x.to_le_bytes());
6208 }
6209 Value::Text(s) | Value::Json(s) => {
6214 out.push(4);
6215 write_str(out, s);
6216 }
6217 Value::Bool(b) => {
6218 out.push(5);
6219 out.push(u8::from(*b));
6220 }
6221 Value::Vector(v) => {
6222 out.push(6);
6223 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6224 out.extend_from_slice(&dim.to_le_bytes());
6225 for x in v {
6226 out.extend_from_slice(&x.to_le_bytes());
6227 }
6228 }
6229 Value::Sq8Vector(q) => {
6234 out.push(11);
6235 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6236 out.extend_from_slice(&dim.to_le_bytes());
6237 out.extend_from_slice(&q.min.to_le_bytes());
6238 out.extend_from_slice(&q.max.to_le_bytes());
6239 out.extend_from_slice(&q.bytes);
6240 }
6241 Value::HalfVector(h) => {
6246 out.push(12);
6247 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6248 out.extend_from_slice(&dim.to_le_bytes());
6249 out.extend_from_slice(&h.bytes);
6250 }
6251 Value::Numeric { scaled, scale } => {
6252 out.push(8);
6253 out.extend_from_slice(&scaled.to_le_bytes());
6254 out.push(*scale);
6255 }
6256 Value::Date(d) => {
6257 out.push(9);
6258 out.extend_from_slice(&d.to_le_bytes());
6259 }
6260 Value::Timestamp(t) => {
6261 out.push(10);
6262 out.extend_from_slice(&t.to_le_bytes());
6263 }
6264 Value::Interval { .. } => {
6268 unreachable!(
6269 "Value::Interval has no on-disk encoding; engine must reject it before write"
6270 )
6271 }
6272 Value::Bytes(b) => {
6277 out.push(14);
6278 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
6279 out.extend_from_slice(&len.to_le_bytes());
6280 out.extend_from_slice(b);
6281 }
6282 Value::TextArray(items) => {
6285 out.push(15);
6286 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6287 out.extend_from_slice(&count.to_le_bytes());
6288 for item in items {
6289 match item {
6290 None => out.push(1),
6291 Some(s) => {
6292 out.push(0);
6293 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6294 out.extend_from_slice(&len.to_le_bytes());
6295 out.extend_from_slice(s.as_bytes());
6296 }
6297 }
6298 }
6299 }
6300 Value::IntArray(items) => {
6303 out.push(16);
6304 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6305 out.extend_from_slice(&count.to_le_bytes());
6306 for item in items {
6307 match item {
6308 None => out.push(1),
6309 Some(n) => {
6310 out.push(0);
6311 out.extend_from_slice(&n.to_le_bytes());
6312 }
6313 }
6314 }
6315 }
6316 Value::BigIntArray(items) => {
6319 out.push(17);
6320 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6321 out.extend_from_slice(&count.to_le_bytes());
6322 for item in items {
6323 match item {
6324 None => out.push(1),
6325 Some(n) => {
6326 out.push(0);
6327 out.extend_from_slice(&n.to_le_bytes());
6328 }
6329 }
6330 }
6331 }
6332 Value::TsVector(lexs) => {
6335 out.push(18);
6336 write_tsvector_body(out, lexs);
6337 }
6338 Value::TsQuery(ast) => {
6341 out.push(19);
6342 write_tsquery_body(out, ast);
6343 }
6344 }
6345}
6346
6347fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
6350 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
6351 out.extend_from_slice(&count.to_le_bytes());
6352 for l in lexs {
6353 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
6354 out.extend_from_slice(&wlen.to_le_bytes());
6355 out.extend_from_slice(l.word.as_bytes());
6356 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
6357 out.extend_from_slice(&plen.to_le_bytes());
6358 for p in &l.positions {
6359 out.extend_from_slice(&p.to_le_bytes());
6360 }
6361 out.push(l.weight);
6362 }
6363}
6364
6365fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
6369 match ast {
6370 TsQueryAst::Term { word, weight_mask } => {
6371 out.push(0);
6372 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
6373 out.extend_from_slice(&len.to_le_bytes());
6374 out.extend_from_slice(word.as_bytes());
6375 out.push(*weight_mask);
6376 }
6377 TsQueryAst::And(a, b) => {
6378 out.push(1);
6379 write_tsquery_body(out, a);
6380 write_tsquery_body(out, b);
6381 }
6382 TsQueryAst::Or(a, b) => {
6383 out.push(2);
6384 write_tsquery_body(out, a);
6385 write_tsquery_body(out, b);
6386 }
6387 TsQueryAst::Not(x) => {
6388 out.push(3);
6389 write_tsquery_body(out, x);
6390 }
6391 TsQueryAst::Phrase {
6392 left,
6393 right,
6394 distance,
6395 } => {
6396 out.push(4);
6397 out.extend_from_slice(&distance.to_le_bytes());
6398 write_tsquery_body(out, left);
6399 write_tsquery_body(out, right);
6400 }
6401 }
6402}
6403
6404fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6406 match ast {
6407 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6408 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6409 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6410 }
6411 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6412 TsQueryAst::Phrase { left, right, .. } => {
6413 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6414 }
6415 }
6416}
6417
6418fn write_u16(out: &mut Vec<u8>, n: u16) {
6419 out.extend_from_slice(&n.to_le_bytes());
6420}
6421fn write_u32(out: &mut Vec<u8>, n: u32) {
6422 out.extend_from_slice(&n.to_le_bytes());
6423}
6424fn write_str(out: &mut Vec<u8>, s: &str) {
6425 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6426 write_u16(out, len);
6427 out.extend_from_slice(s.as_bytes());
6428}
6429
6430fn write_str_long(out: &mut Vec<u8>, s: &str) {
6435 let len = u32::try_from(s.len()).expect("function body fits in u32");
6436 write_u32(out, len);
6437 out.extend_from_slice(s.as_bytes());
6438}
6439
6440fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6444 match key {
6445 IndexKey::Int(n) => {
6446 out.push(INDEX_KEY_TAG_INT);
6447 out.extend_from_slice(&n.to_le_bytes());
6448 }
6449 IndexKey::Text(s) => {
6450 out.push(INDEX_KEY_TAG_TEXT);
6451 write_str(out, s);
6452 }
6453 IndexKey::Bool(b) => {
6454 out.push(INDEX_KEY_TAG_BOOL);
6455 out.push(u8::from(*b));
6456 }
6457 }
6458}
6459
6460struct Cursor<'a> {
6461 buf: &'a [u8],
6462 pos: usize,
6463}
6464
6465impl<'a> Cursor<'a> {
6466 const fn new(buf: &'a [u8]) -> Self {
6467 Self { buf, pos: 0 }
6468 }
6469
6470 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6471 let end = self
6472 .pos
6473 .checked_add(n)
6474 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6475 if end > self.buf.len() {
6476 return Err(StorageError::Corrupt(format!(
6477 "unexpected EOF at offset {} (wanted {n} more bytes)",
6478 self.pos
6479 )));
6480 }
6481 let s = &self.buf[self.pos..end];
6482 self.pos = end;
6483 Ok(s)
6484 }
6485
6486 fn read_u8(&mut self) -> Result<u8, StorageError> {
6487 Ok(self.take(1)?[0])
6488 }
6489 fn read_u16(&mut self) -> Result<u16, StorageError> {
6490 let s = self.take(2)?;
6491 Ok(u16::from_le_bytes([s[0], s[1]]))
6492 }
6493 fn read_u32(&mut self) -> Result<u32, StorageError> {
6494 let s = self.take(4)?;
6495 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6496 }
6497 fn read_i32(&mut self) -> Result<i32, StorageError> {
6498 let s = self.take(4)?;
6499 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6500 }
6501 fn read_u64(&mut self) -> Result<u64, StorageError> {
6504 let s = self.take(8)?;
6505 Ok(u64::from_le_bytes([
6506 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6507 ]))
6508 }
6509 fn read_i64(&mut self) -> Result<i64, StorageError> {
6510 let s = self.take(8)?;
6511 let arr: [u8; 8] = s.try_into().expect("checked");
6512 Ok(i64::from_le_bytes(arr))
6513 }
6514 fn read_f64(&mut self) -> Result<f64, StorageError> {
6515 let s = self.take(8)?;
6516 let arr: [u8; 8] = s.try_into().expect("checked");
6517 Ok(f64::from_le_bytes(arr))
6518 }
6519 fn read_f32(&mut self) -> Result<f32, StorageError> {
6520 let s = self.take(4)?;
6521 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6522 }
6523 fn read_str(&mut self) -> Result<String, StorageError> {
6524 let len = self.read_u16()? as usize;
6525 let bytes = self.take(len)?;
6526 core::str::from_utf8(bytes)
6527 .map(String::from)
6528 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6529 }
6530
6531 fn read_str_long(&mut self) -> Result<String, StorageError> {
6535 let len = self.read_u32()? as usize;
6536 let bytes = self.take(len)?;
6537 core::str::from_utf8(bytes)
6538 .map(String::from)
6539 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6540 }
6541
6542 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6546 let tag = self.read_u8()?;
6547 match tag {
6548 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6549 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6550 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6551 other => Err(StorageError::Corrupt(format!(
6552 "unknown index key tag: {other}"
6553 ))),
6554 }
6555 }
6556 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6562 match ty {
6563 DataType::SmallInt => {
6564 let s = self.take(2)?;
6565 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6566 }
6567 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6568 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6569 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6570 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6571 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6572 Ok(Value::Text(self.read_str()?))
6573 }
6574 DataType::Vector {
6575 encoding: VecEncoding::F32,
6576 ..
6577 } => {
6578 let dim = self.read_u32()? as usize;
6579 let mut v = Vec::with_capacity(dim);
6580 for _ in 0..dim {
6581 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6582 v.push(f32::from_le_bytes(bytes));
6583 }
6584 Ok(Value::Vector(v))
6585 }
6586 DataType::Vector {
6587 encoding: VecEncoding::Sq8,
6588 ..
6589 } => {
6590 let dim = self.read_u32()? as usize;
6591 let min = self.read_f32()?;
6592 let max = self.read_f32()?;
6593 let bytes = self.take(dim)?.to_vec();
6594 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6595 }
6596 DataType::Vector {
6597 encoding: VecEncoding::F16,
6598 ..
6599 } => {
6600 let dim = self.read_u32()? as usize;
6601 let bytes = self.take(dim * 2)?.to_vec();
6602 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6603 }
6604 DataType::Numeric { .. } => {
6605 let s = self.take(16)?;
6606 let arr: [u8; 16] = s.try_into().expect("checked");
6607 let scaled = i128::from_le_bytes(arr);
6608 let scale = self.read_u8()?;
6609 Ok(Value::Numeric { scaled, scale })
6610 }
6611 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6612 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6613 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6614 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6615 DataType::Interval => {
6616 Err(StorageError::Corrupt(
6621 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6622 ))
6623 }
6624 DataType::Json => Ok(Value::Json(self.read_str()?)),
6625 DataType::Bytes => {
6628 let len = self.read_u16()? as usize;
6629 let bytes = self.take(len)?.to_vec();
6630 Ok(Value::Bytes(bytes))
6631 }
6632 DataType::TextArray => {
6634 let count = self.read_u16()? as usize;
6635 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6636 for _ in 0..count {
6637 match self.read_u8()? {
6638 0 => items.push(Some(self.read_str()?)),
6639 1 => items.push(None),
6640 other => {
6641 return Err(StorageError::Corrupt(format!(
6642 "TEXT[] null flag: unknown byte {other}"
6643 )));
6644 }
6645 }
6646 }
6647 Ok(Value::TextArray(items))
6648 }
6649 DataType::IntArray => {
6651 let count = self.read_u16()? as usize;
6652 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6653 for _ in 0..count {
6654 match self.read_u8()? {
6655 0 => items.push(Some(self.read_i32()?)),
6656 1 => items.push(None),
6657 other => {
6658 return Err(StorageError::Corrupt(format!(
6659 "INT[] null flag: unknown byte {other}"
6660 )));
6661 }
6662 }
6663 }
6664 Ok(Value::IntArray(items))
6665 }
6666 DataType::BigIntArray => {
6668 let count = self.read_u16()? as usize;
6669 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6670 for _ in 0..count {
6671 match self.read_u8()? {
6672 0 => items.push(Some(self.read_i64()?)),
6673 1 => items.push(None),
6674 other => {
6675 return Err(StorageError::Corrupt(format!(
6676 "BIGINT[] null flag: unknown byte {other}"
6677 )));
6678 }
6679 }
6680 }
6681 Ok(Value::BigIntArray(items))
6682 }
6683 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6687 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6688 }
6689 }
6690
6691 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6693 let count = self.read_u16()? as usize;
6694 let mut out = Vec::with_capacity(count);
6695 for _ in 0..count {
6696 let word = self.read_str()?;
6697 let pos_count = self.read_u16()? as usize;
6698 let mut positions = Vec::with_capacity(pos_count);
6699 for _ in 0..pos_count {
6700 positions.push(self.read_u16()?);
6701 }
6702 let weight = self.read_u8()?;
6703 out.push(TsLexeme {
6704 word,
6705 positions,
6706 weight,
6707 });
6708 }
6709 Ok(out)
6710 }
6711
6712 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6714 let tag = self.read_u8()?;
6715 match tag {
6716 0 => {
6717 let word = self.read_str()?;
6718 let weight_mask = self.read_u8()?;
6719 Ok(TsQueryAst::Term { word, weight_mask })
6720 }
6721 1 => {
6722 let a = self.read_tsquery_body()?;
6723 let b = self.read_tsquery_body()?;
6724 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6725 }
6726 2 => {
6727 let a = self.read_tsquery_body()?;
6728 let b = self.read_tsquery_body()?;
6729 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6730 }
6731 3 => {
6732 let x = self.read_tsquery_body()?;
6733 Ok(TsQueryAst::Not(Box::new(x)))
6734 }
6735 4 => {
6736 let distance = self.read_u16()?;
6737 let left = self.read_tsquery_body()?;
6738 let right = self.read_tsquery_body()?;
6739 Ok(TsQueryAst::Phrase {
6740 left: Box::new(left),
6741 right: Box::new(right),
6742 distance,
6743 })
6744 }
6745 other => Err(StorageError::Corrupt(format!(
6746 "tsquery: unknown node tag {other}"
6747 ))),
6748 }
6749 }
6750
6751 fn read_value(&mut self) -> Result<Value, StorageError> {
6752 let tag = self.read_u8()?;
6753 match tag {
6754 0 => Ok(Value::Null),
6755 1 => Ok(Value::Int(self.read_i32()?)),
6756 2 => Ok(Value::BigInt(self.read_i64()?)),
6757 3 => Ok(Value::Float(self.read_f64()?)),
6758 4 => Ok(Value::Text(self.read_str()?)),
6759 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6760 6 => {
6761 let dim = self.read_u32()? as usize;
6762 let mut v = Vec::with_capacity(dim);
6763 for _ in 0..dim {
6764 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6765 v.push(f32::from_le_bytes(bytes));
6766 }
6767 Ok(Value::Vector(v))
6768 }
6769 7 => {
6770 let s = self.take(2)?;
6771 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6772 }
6773 8 => {
6774 let s = self.take(16)?;
6775 let arr: [u8; 16] = s.try_into().expect("checked");
6776 let scaled = i128::from_le_bytes(arr);
6777 let scale = self.read_u8()?;
6778 Ok(Value::Numeric { scaled, scale })
6779 }
6780 9 => Ok(Value::Date(self.read_i32()?)),
6781 10 => Ok(Value::Timestamp(self.read_i64()?)),
6782 11 => {
6787 let dim = self.read_u32()? as usize;
6788 let min = self.read_f32()?;
6789 let max = self.read_f32()?;
6790 let bytes = self.take(dim)?.to_vec();
6791 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6792 }
6793 12 => {
6796 let dim = self.read_u32()? as usize;
6797 let bytes = self.take(dim * 2)?.to_vec();
6798 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6799 }
6800 14 => {
6802 let len = self.read_u16()? as usize;
6803 let bytes = self.take(len)?.to_vec();
6804 Ok(Value::Bytes(bytes))
6805 }
6806 15 => {
6809 let count = self.read_u16()? as usize;
6810 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6811 for _ in 0..count {
6812 match self.read_u8()? {
6813 0 => items.push(Some(self.read_str()?)),
6814 1 => items.push(None),
6815 other => {
6816 return Err(StorageError::Corrupt(format!(
6817 "TEXT[] null flag in value tag: unknown byte {other}"
6818 )));
6819 }
6820 }
6821 }
6822 Ok(Value::TextArray(items))
6823 }
6824 16 => {
6826 let count = self.read_u16()? as usize;
6827 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6828 for _ in 0..count {
6829 match self.read_u8()? {
6830 0 => items.push(Some(self.read_i32()?)),
6831 1 => items.push(None),
6832 other => {
6833 return Err(StorageError::Corrupt(format!(
6834 "INT[] null flag in value tag: unknown byte {other}"
6835 )));
6836 }
6837 }
6838 }
6839 Ok(Value::IntArray(items))
6840 }
6841 17 => {
6842 let count = self.read_u16()? as usize;
6843 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6844 for _ in 0..count {
6845 match self.read_u8()? {
6846 0 => items.push(Some(self.read_i64()?)),
6847 1 => items.push(None),
6848 other => {
6849 return Err(StorageError::Corrupt(format!(
6850 "BIGINT[] null flag in value tag: unknown byte {other}"
6851 )));
6852 }
6853 }
6854 }
6855 Ok(Value::BigIntArray(items))
6856 }
6857 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6860 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6862 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6863 }
6864 }
6865
6866 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6870 let m_max_0 = self.read_u16()? as usize;
6871 let entry_raw = self.read_u32()?;
6872 let entry = if entry_raw == u32::MAX {
6873 None
6874 } else {
6875 Some(entry_raw as usize)
6876 };
6877 let entry_level = self.read_u8()?;
6878 let node_count = self.read_u32()? as usize;
6879 let mut levels: PersistentVec<u8> = PersistentVec::new();
6884 for _ in 0..node_count {
6885 levels.push_mut(self.read_u8()?);
6886 }
6887 let layer_count = self.read_u8()? as usize;
6888 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6889 for _ in 0..layer_count {
6890 let n = self.read_u32()? as usize;
6891 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6892 for _ in 0..n {
6893 let cnt = self.read_u16()? as usize;
6894 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6895 for _ in 0..cnt {
6896 row.push(self.read_u32()?);
6897 }
6898 per_layer.push_mut(row);
6899 }
6900 layers.push(per_layer);
6901 }
6902 Ok(NswGraph {
6903 m,
6904 m_max_0,
6905 entry,
6906 entry_level,
6907 levels,
6908 layers,
6909 })
6910 }
6911}
6912
6913#[cfg(test)]
6914mod tests {
6915 use super::*;
6916 use alloc::string::ToString;
6917 use alloc::vec;
6918
6919 #[cfg(target_arch = "aarch64")]
6920 #[test]
6921 fn neon_l2_matches_scalar() {
6922 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6927 for &d in &dims {
6928 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6929 let mut a = Vec::with_capacity(d);
6930 let mut b = Vec::with_capacity(d);
6931 for _ in 0..d {
6932 state = state
6933 .wrapping_mul(6_364_136_223_846_793_005)
6934 .wrapping_add(1);
6935 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6936 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6937 state = state
6938 .wrapping_mul(6_364_136_223_846_793_005)
6939 .wrapping_add(1);
6940 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6941 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6942 a.push(x);
6943 b.push(y);
6944 }
6945 let scalar = l2_distance_sq_scalar(&a, &b);
6946 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6947 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6948 assert!(
6949 (scalar - neon).abs() <= tol,
6950 "dim={d}: scalar={scalar} neon={neon} diff={}",
6951 (scalar - neon).abs()
6952 );
6953 }
6954 }
6955
6956 #[cfg(target_arch = "aarch64")]
6957 #[test]
6958 fn neon_inner_product_matches_scalar() {
6959 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6963 for &d in &dims {
6964 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6965 let mut a = Vec::with_capacity(d);
6966 let mut b = Vec::with_capacity(d);
6967 for _ in 0..d {
6968 state = state
6969 .wrapping_mul(6_364_136_223_846_793_005)
6970 .wrapping_add(1);
6971 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6972 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6973 state = state
6974 .wrapping_mul(6_364_136_223_846_793_005)
6975 .wrapping_add(1);
6976 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6977 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6978 a.push(x);
6979 b.push(y);
6980 }
6981 let scalar = inner_product_scalar(&a, &b);
6982 let neon = unsafe { inner_product_neon(&a, &b) };
6983 #[allow(clippy::cast_precision_loss)]
6984 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6985 assert!(
6986 (scalar - neon).abs() <= tol,
6987 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6988 (scalar - neon).abs()
6989 );
6990 }
6991 }
6992
6993 #[cfg(target_arch = "aarch64")]
6994 #[allow(clippy::similar_names)]
6995 #[test]
6996 fn neon_cosine_dot_norms_matches_scalar() {
6997 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6998 for &d in &dims {
6999 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
7000 let mut a = Vec::with_capacity(d);
7001 let mut b = Vec::with_capacity(d);
7002 for _ in 0..d {
7003 state = state
7004 .wrapping_mul(6_364_136_223_846_793_005)
7005 .wrapping_add(1);
7006 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7007 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7008 state = state
7009 .wrapping_mul(6_364_136_223_846_793_005)
7010 .wrapping_add(1);
7011 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7012 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7013 a.push(x);
7014 b.push(y);
7015 }
7016 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
7017 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
7018 #[allow(clippy::cast_precision_loss)]
7019 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
7020 #[allow(clippy::cast_precision_loss)]
7021 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
7022 assert!(
7023 (dot_s - dot_n).abs() <= tol_d,
7024 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
7025 );
7026 assert!(
7027 (na_s - na_n).abs() <= tol_n,
7028 "cosine na dim={d}: scalar={na_s} neon={na_n}"
7029 );
7030 assert!(
7031 (nb_s - nb_n).abs() <= tol_n,
7032 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
7033 );
7034 }
7035 }
7036
7037 fn make_users_schema() -> TableSchema {
7038 TableSchema::new(
7039 "users",
7040 vec![
7041 ColumnSchema::new("id", DataType::Int, false),
7042 ColumnSchema::new("name", DataType::Text, false),
7043 ColumnSchema::new("score", DataType::Float, true),
7044 ],
7045 )
7046 }
7047
7048 #[test]
7049 fn value_type_tag_matches_variant() {
7050 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
7051 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
7052 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
7053 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
7054 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
7055 assert_eq!(Value::Null.data_type(), None);
7056 assert!(Value::Null.is_null());
7057 assert!(!Value::Int(0).is_null());
7058 }
7059
7060 #[test]
7061 fn sq8_value_reports_sq8_data_type() {
7062 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
7067 let v = Value::Sq8Vector(q);
7068 assert_eq!(
7069 v.data_type(),
7070 Some(DataType::Vector {
7071 dim: 5,
7072 encoding: VecEncoding::Sq8,
7073 }),
7074 );
7075 }
7076
7077 #[test]
7078 fn datatype_display_matches_pg_keyword() {
7079 assert_eq!(DataType::Int.to_string(), "INT");
7080 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
7081 assert_eq!(DataType::Float.to_string(), "FLOAT");
7082 assert_eq!(DataType::Text.to_string(), "TEXT");
7083 assert_eq!(DataType::Bool.to_string(), "BOOL");
7084 }
7085
7086 #[test]
7087 fn row_len_and_emptiness() {
7088 let r = Row::new(vec![Value::Int(1), Value::Null]);
7089 assert_eq!(r.len(), 2);
7090 assert!(!r.is_empty());
7091 assert!(Row::new(Vec::new()).is_empty());
7092 }
7093
7094 #[test]
7095 fn table_schema_column_position() {
7096 let s = make_users_schema();
7097 assert_eq!(s.column_position("id"), Some(0));
7098 assert_eq!(s.column_position("score"), Some(2));
7099 assert_eq!(s.column_position("missing"), None);
7100 }
7101
7102 #[test]
7103 fn catalog_create_table_then_lookup() {
7104 let mut cat = Catalog::new();
7105 cat.create_table(make_users_schema()).unwrap();
7106 assert_eq!(cat.table_count(), 1);
7107 assert!(cat.get("users").is_some());
7108 assert!(cat.get("nope").is_none());
7109 }
7110
7111 #[test]
7112 fn catalog_duplicate_table_is_rejected() {
7113 let mut cat = Catalog::new();
7114 cat.create_table(make_users_schema()).unwrap();
7115 let err = cat.create_table(make_users_schema()).unwrap_err();
7116 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
7117 }
7118
7119 #[test]
7120 fn table_insert_happy_path_appends_row() {
7121 let mut cat = Catalog::new();
7122 cat.create_table(make_users_schema()).unwrap();
7123 let t = cat.get_mut("users").unwrap();
7124 t.insert(Row::new(vec![
7125 Value::Int(1),
7126 Value::Text("alice".into()),
7127 Value::Float(99.5),
7128 ]))
7129 .unwrap();
7130 assert_eq!(t.row_count(), 1);
7131 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
7132 }
7133
7134 #[test]
7135 fn table_insert_arity_mismatch() {
7136 let mut cat = Catalog::new();
7137 cat.create_table(make_users_schema()).unwrap();
7138 let t = cat.get_mut("users").unwrap();
7139 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
7140 assert!(matches!(
7141 err,
7142 StorageError::ArityMismatch {
7143 expected: 3,
7144 actual: 1
7145 }
7146 ));
7147 assert_eq!(t.row_count(), 0);
7148 }
7149
7150 #[test]
7151 fn table_insert_type_mismatch_reports_column() {
7152 let mut cat = Catalog::new();
7153 cat.create_table(make_users_schema()).unwrap();
7154 let t = cat.get_mut("users").unwrap();
7155 let err = t
7156 .insert(Row::new(vec![
7157 Value::Int(1),
7158 Value::Int(42), Value::Float(0.0),
7160 ]))
7161 .unwrap_err();
7162 match err {
7163 StorageError::TypeMismatch {
7164 ref column,
7165 expected,
7166 actual,
7167 position,
7168 } => {
7169 assert_eq!(column, "name");
7170 assert_eq!(expected, DataType::Text);
7171 assert_eq!(actual, DataType::Int);
7172 assert_eq!(position, 1);
7173 }
7174 other => panic!("unexpected: {other:?}"),
7175 }
7176 assert_eq!(t.row_count(), 0);
7177 }
7178
7179 #[test]
7180 fn table_insert_null_into_not_null_rejected() {
7181 let mut cat = Catalog::new();
7182 cat.create_table(make_users_schema()).unwrap();
7183 let t = cat.get_mut("users").unwrap();
7184 let err = t
7185 .insert(Row::new(vec![
7186 Value::Int(1),
7187 Value::Null, Value::Float(1.0),
7189 ]))
7190 .unwrap_err();
7191 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
7192 }
7193
7194 #[test]
7195 fn table_insert_null_into_nullable_ok() {
7196 let mut cat = Catalog::new();
7197 cat.create_table(make_users_schema()).unwrap();
7198 let t = cat.get_mut("users").unwrap();
7199 t.insert(Row::new(vec![
7200 Value::Int(1),
7201 Value::Text("bob".into()),
7202 Value::Null,
7203 ]))
7204 .unwrap();
7205 assert_eq!(t.row_count(), 1);
7206 }
7207
7208 #[test]
7209 fn catalog_get_mut_independent_per_table() {
7210 let mut cat = Catalog::new();
7211 cat.create_table(TableSchema::new(
7212 "a",
7213 vec![ColumnSchema::new("v", DataType::Int, false)],
7214 ))
7215 .unwrap();
7216 cat.create_table(TableSchema::new(
7217 "b",
7218 vec![ColumnSchema::new("v", DataType::Int, false)],
7219 ))
7220 .unwrap();
7221 cat.get_mut("a")
7222 .unwrap()
7223 .insert(Row::new(vec![Value::Int(1)]))
7224 .unwrap();
7225 assert_eq!(cat.get("a").unwrap().row_count(), 1);
7226 assert_eq!(cat.get("b").unwrap().row_count(), 0);
7227 }
7228
7229 fn assert_round_trip(cat: &Catalog) {
7232 let bytes = cat.serialize();
7233 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7234 assert_eq!(restored.table_count(), cat.table_count());
7237 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
7238 assert_eq!(a.schema, b.schema);
7239 assert_eq!(a.rows, b.rows);
7240 }
7241 }
7242
7243 #[test]
7244 fn serialize_empty_catalog_round_trips() {
7245 assert_round_trip(&Catalog::new());
7246 }
7247
7248 #[test]
7249 fn serialize_single_empty_table_round_trips() {
7250 let mut cat = Catalog::new();
7251 cat.create_table(make_users_schema()).unwrap();
7252 assert_round_trip(&cat);
7253 }
7254
7255 #[test]
7256 fn nsw_clone_is_o1() {
7257 let mut cat = Catalog::new();
7266 cat.create_table(TableSchema::new(
7267 "docs",
7268 alloc::vec![
7269 ColumnSchema::new("id", DataType::Int, false),
7270 ColumnSchema::new(
7271 "v",
7272 DataType::Vector {
7273 dim: 3,
7274 encoding: VecEncoding::F32
7275 },
7276 true
7277 ),
7278 ],
7279 ))
7280 .unwrap();
7281 let t = cat.get_mut("docs").unwrap();
7282 for i in 0..1500_i32 {
7283 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
7285 t.insert(Row::new(alloc::vec![
7286 Value::Int(i),
7287 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7288 ]))
7289 .unwrap();
7290 }
7291 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7292 .unwrap();
7293 let g = match &cat.get("docs").unwrap().indices()[0].kind {
7294 IndexKind::Nsw(g) => g,
7295 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7296 panic!("expected NSW")
7297 }
7298 };
7299 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
7302 assert!(
7303 g.layers.len() >= 2,
7304 "1500 nodes should populate at least two HNSW layers, got {}",
7305 g.layers.len()
7306 );
7307
7308 let cloned = g.clone();
7309
7310 assert!(
7311 g.levels.shares_storage_with(&cloned.levels),
7312 "levels PV not shared after clone — clone copied elements (O(N))"
7313 );
7314 assert_eq!(g.layers.len(), cloned.layers.len());
7315 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
7316 assert!(
7317 orig.shares_storage_with(cl),
7318 "layer {l} PV not shared after clone — clone copied elements (O(N))"
7319 );
7320 }
7321 }
7322
7323 #[test]
7324 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
7325 let mut cat = Catalog::new();
7332 cat.create_table(TableSchema::new(
7333 "vecs",
7334 alloc::vec![
7335 ColumnSchema::new("id", DataType::Int, false),
7336 ColumnSchema::new(
7337 "v",
7338 DataType::Vector {
7339 dim: 8,
7340 encoding: VecEncoding::Sq8,
7341 },
7342 false,
7343 ),
7344 ],
7345 ))
7346 .unwrap();
7347 let t = cat.get_mut("vecs").unwrap();
7348 for i in 0..32_i32 {
7349 #[allow(clippy::cast_precision_loss)]
7350 let base = (i as f32) * 0.03;
7351 let v: Vec<f32> = (0..8_i32)
7352 .map(|j| {
7353 #[allow(clippy::cast_precision_loss)]
7354 let off = (j as f32) * 0.01;
7355 base + off
7356 })
7357 .collect();
7358 t.insert(Row::new(alloc::vec![
7359 Value::Int(i),
7360 Value::Sq8Vector(quantize::quantize(&v)),
7361 ]))
7362 .unwrap();
7363 }
7364 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7365 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7368 let (before_cell, before_ty, before_hits) = {
7369 let t_ref = cat.get("vecs").unwrap();
7370 (
7371 t_ref.rows()[5].values[1].clone(),
7372 t_ref.schema().columns[1].ty,
7373 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7374 )
7375 };
7376
7377 let bytes = cat.serialize();
7378 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7379 let rt = restored.get("vecs").unwrap();
7380 assert_eq!(rt.schema().columns[1].ty, before_ty);
7381 assert_eq!(rt.rows()[5].values[1], before_cell);
7382 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7383 assert_eq!(before_hits, after_hits);
7384 }
7385
7386 #[test]
7387 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7388 use crate::halfvec;
7395 let mut cat = Catalog::new();
7396 cat.create_table(TableSchema::new(
7397 "vecs",
7398 alloc::vec![
7399 ColumnSchema::new("id", DataType::Int, false),
7400 ColumnSchema::new(
7401 "v",
7402 DataType::Vector {
7403 dim: 8,
7404 encoding: VecEncoding::F16,
7405 },
7406 false,
7407 ),
7408 ],
7409 ))
7410 .unwrap();
7411 let t = cat.get_mut("vecs").unwrap();
7412 for i in 0..32_i32 {
7413 #[allow(clippy::cast_precision_loss)]
7414 let base = (i as f32) * 0.03;
7415 let v: Vec<f32> = (0..8_i32)
7416 .map(|j| {
7417 #[allow(clippy::cast_precision_loss)]
7418 let off = (j as f32) * 0.01;
7419 base + off
7420 })
7421 .collect();
7422 t.insert(Row::new(alloc::vec![
7423 Value::Int(i),
7424 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7425 ]))
7426 .unwrap();
7427 }
7428 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7429 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7430 let (before_cell, before_ty, before_hits) = {
7431 let t_ref = cat.get("vecs").unwrap();
7432 (
7433 t_ref.rows()[5].values[1].clone(),
7434 t_ref.schema().columns[1].ty,
7435 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7436 )
7437 };
7438 let bytes = cat.serialize();
7439 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7440 let rt = restored.get("vecs").unwrap();
7441 assert_eq!(rt.schema().columns[1].ty, before_ty);
7442 assert_eq!(rt.rows()[5].values[1], before_cell);
7443 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7444 assert_eq!(before_hits, after_hits);
7445 }
7446
7447 #[test]
7448 #[allow(clippy::similar_names)]
7449 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7450 use crate::halfvec;
7457 fn next(state: &mut u64) -> f32 {
7458 *state = state
7459 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7460 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7461 #[allow(clippy::cast_precision_loss)]
7462 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7463 2.0 * u - 1.0
7464 }
7465 let dim: u32 = 32;
7466 let n: usize = 512;
7467 let dim_us = dim as usize;
7468 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7469 let corpus: Vec<Vec<f32>> = (0..n)
7470 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7471 .collect();
7472 let queries: Vec<Vec<f32>> = (0..32)
7473 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7474 .collect();
7475 let exact_top10: Vec<Vec<usize>> = queries
7476 .iter()
7477 .map(|q| {
7478 let mut scored: Vec<(f32, usize)> = corpus
7479 .iter()
7480 .enumerate()
7481 .map(|(i, v)| (l2_distance_sq(v, q), i))
7482 .collect();
7483 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7484 scored.into_iter().take(10).map(|(_, i)| i).collect()
7485 })
7486 .collect();
7487 let mut cat = Catalog::new();
7488 cat.create_table(TableSchema::new(
7489 "vecs",
7490 alloc::vec![
7491 ColumnSchema::new("id", DataType::Int, false),
7492 ColumnSchema::new(
7493 "v",
7494 DataType::Vector {
7495 dim,
7496 encoding: VecEncoding::F16,
7497 },
7498 false,
7499 ),
7500 ],
7501 ))
7502 .unwrap();
7503 let t = cat.get_mut("vecs").unwrap();
7504 for (i, v) in corpus.iter().enumerate() {
7505 t.insert(Row::new(alloc::vec![
7506 Value::Int(i32::try_from(i).unwrap()),
7507 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7508 ]))
7509 .unwrap();
7510 }
7511 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7512 let table = cat.get("vecs").unwrap();
7513 let mut total_overlap = 0_usize;
7514 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7515 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7516 for h in &hits {
7517 if exact.contains(h) {
7518 total_overlap += 1;
7519 }
7520 }
7521 }
7522 #[allow(clippy::cast_precision_loss)]
7523 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7524 assert!(
7525 recall >= 0.95,
7526 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7527 check halfvec dispatch in `cell_to_query_metric_distance`"
7528 );
7529 }
7530
7531 #[test]
7532 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7533 use crate::quantize;
7540 fn next(state: &mut u64) -> f32 {
7544 *state = state
7545 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7546 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7547 #[allow(clippy::cast_precision_loss)]
7548 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7549 2.0 * u - 1.0
7550 }
7551 let dim: u32 = 32;
7552 let n: usize = 512;
7553 let dim_us = dim as usize;
7554 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7555 let corpus: Vec<Vec<f32>> = (0..n)
7556 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7557 .collect();
7558 let queries: Vec<Vec<f32>> = (0..32)
7559 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7560 .collect();
7561 let exact_top10: Vec<Vec<usize>> = queries
7563 .iter()
7564 .map(|q| {
7565 let mut scored: Vec<(f32, usize)> = corpus
7566 .iter()
7567 .enumerate()
7568 .map(|(i, v)| (l2_distance_sq(v, q), i))
7569 .collect();
7570 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7571 scored.into_iter().take(10).map(|(_, i)| i).collect()
7572 })
7573 .collect();
7574 let mut cat = Catalog::new();
7577 cat.create_table(TableSchema::new(
7578 "vecs",
7579 alloc::vec![
7580 ColumnSchema::new("id", DataType::Int, false),
7581 ColumnSchema::new(
7582 "v",
7583 DataType::Vector {
7584 dim,
7585 encoding: VecEncoding::Sq8,
7586 },
7587 false,
7588 ),
7589 ],
7590 ))
7591 .unwrap();
7592 let t = cat.get_mut("vecs").unwrap();
7593 for (i, v) in corpus.iter().enumerate() {
7594 t.insert(Row::new(alloc::vec![
7595 Value::Int(i32::try_from(i).unwrap()),
7596 Value::Sq8Vector(quantize::quantize(v)),
7597 ]))
7598 .unwrap();
7599 }
7600 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7601 let table = cat.get("vecs").unwrap();
7602 let mut total_overlap = 0_usize;
7603 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7604 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7605 for h in &hits {
7606 if exact.contains(h) {
7607 total_overlap += 1;
7608 }
7609 }
7610 }
7611 #[allow(clippy::cast_precision_loss)]
7612 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7613 assert!(
7614 recall >= 0.95,
7615 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7616 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7617 );
7618 }
7619
7620 #[test]
7621 fn nsw_index_topology_persists_through_round_trip() {
7622 let mut cat = Catalog::new();
7628 cat.create_table(TableSchema::new(
7629 "docs",
7630 alloc::vec![
7631 ColumnSchema::new("id", DataType::Int, false),
7632 ColumnSchema::new(
7633 "v",
7634 DataType::Vector {
7635 dim: 3,
7636 encoding: VecEncoding::F32
7637 },
7638 true
7639 ),
7640 ],
7641 ))
7642 .unwrap();
7643 let t = cat.get_mut("docs").unwrap();
7644 for i in 0..6_i32 {
7645 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7647 let row = Row::new(alloc::vec![
7648 Value::Int(i),
7649 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7650 ]);
7651 t.insert(row).unwrap();
7652 }
7653 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7654 .unwrap();
7655 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7656 IndexKind::Nsw(g) => g.clone(),
7657 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7658 panic!("expected NSW")
7659 }
7660 };
7661 let bytes = cat.serialize();
7662 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7663 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7664 IndexKind::Nsw(g) => g.clone(),
7665 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7666 panic!("expected NSW")
7667 }
7668 };
7669 assert_eq!(restored_graph.m, original.m);
7670 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7671 assert_eq!(restored_graph.entry, original.entry);
7672 assert_eq!(restored_graph.entry_level, original.entry_level);
7673 assert_eq!(restored_graph.levels, original.levels);
7674 assert_eq!(restored_graph.layers, original.layers);
7675 }
7676
7677 #[test]
7678 fn hnsw_level_assignment_is_deterministic() {
7679 for i in 0..32usize {
7682 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7683 }
7684 }
7685
7686 #[test]
7687 fn hnsw_layer_0_dominates_population() {
7688 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7693 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7694 }
7695
7696 #[test]
7697 fn hnsw_search_matches_brute_force_for_l2_top1() {
7698 let mut cat = Catalog::new();
7702 cat.create_table(TableSchema::new(
7703 "vecs",
7704 alloc::vec![
7705 ColumnSchema::new("id", DataType::Int, false),
7706 ColumnSchema::new(
7707 "v",
7708 DataType::Vector {
7709 dim: 3,
7710 encoding: VecEncoding::F32
7711 },
7712 true
7713 ),
7714 ],
7715 ))
7716 .unwrap();
7717 let t = cat.get_mut("vecs").unwrap();
7718 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7719 (1, [0.0, 0.0, 0.0]),
7720 (2, [1.0, 0.0, 0.0]),
7721 (3, [0.0, 1.0, 0.0]),
7722 (4, [0.0, 0.0, 1.0]),
7723 (5, [1.0, 1.0, 0.0]),
7724 (6, [1.0, 0.0, 1.0]),
7725 (7, [0.0, 1.0, 1.0]),
7726 (8, [1.0, 1.0, 1.0]),
7727 (9, [0.5, 0.5, 0.5]),
7728 (10, [0.2, 0.8, 0.5]),
7729 ];
7730 for &(id, v) in &dataset {
7731 t.insert(Row::new(alloc::vec![
7732 Value::Int(id),
7733 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7734 ]))
7735 .unwrap();
7736 }
7737 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7738 let idx_pos = cat
7739 .get("vecs")
7740 .unwrap()
7741 .indices()
7742 .iter()
7743 .position(|i| i.name == "v_idx")
7744 .unwrap();
7745 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7746 let table = cat.get("vecs").unwrap();
7747 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7748 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7749 .map(|i| {
7750 let Value::Vector(v) = &table.rows[i].values[1] else {
7751 return (f32::INFINITY, i);
7752 };
7753 (l2_distance_sq(v, &query), i)
7754 })
7755 .collect();
7756 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7757 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7758 assert_eq!(
7759 hnsw_top[0].1, brute[0].1,
7760 "HNSW top-1 != brute-force top-1 for {query:?}"
7761 );
7762 }
7763 }
7764
7765 #[test]
7766 fn serialize_table_with_rows_round_trips() {
7767 let mut cat = Catalog::new();
7768 cat.create_table(make_users_schema()).unwrap();
7769 let t = cat.get_mut("users").unwrap();
7770 t.insert(Row::new(vec![
7771 Value::Int(1),
7772 Value::Text("alice".into()),
7773 Value::Float(95.5),
7774 ]))
7775 .unwrap();
7776 t.insert(Row::new(vec![
7777 Value::Int(2),
7778 Value::Text("bob".into()),
7779 Value::Null,
7780 ]))
7781 .unwrap();
7782 assert_round_trip(&cat);
7783 }
7784
7785 #[test]
7786 fn serialize_multiple_tables_round_trips() {
7787 let mut cat = Catalog::new();
7788 cat.create_table(make_users_schema()).unwrap();
7789 cat.create_table(TableSchema::new(
7790 "flags",
7791 vec![
7792 ColumnSchema::new("id", DataType::BigInt, false),
7793 ColumnSchema::new("active", DataType::Bool, false),
7794 ],
7795 ))
7796 .unwrap();
7797 cat.get_mut("flags")
7798 .unwrap()
7799 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7800 .unwrap();
7801 assert_round_trip(&cat);
7802 }
7803
7804 #[test]
7805 fn deserialize_rejects_bad_magic() {
7806 let mut buf = b"BADMAGIC".to_vec();
7807 buf.push(FILE_VERSION);
7808 buf.extend_from_slice(&0u32.to_le_bytes());
7809 let err = Catalog::deserialize(&buf).unwrap_err();
7810 assert!(matches!(err, StorageError::Corrupt(_)));
7811 }
7812
7813 #[test]
7814 fn deserialize_rejects_unsupported_version() {
7815 let mut buf = FILE_MAGIC.to_vec();
7816 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7818 let err = Catalog::deserialize(&buf).unwrap_err();
7819 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7820 }
7821
7822 #[test]
7823 fn deserialize_rejects_truncated_file() {
7824 let mut cat = Catalog::new();
7825 cat.create_table(make_users_schema()).unwrap();
7826 let bytes = cat.serialize();
7827 let truncated = &bytes[..bytes.len() - 1];
7829 assert!(matches!(
7830 Catalog::deserialize(truncated),
7831 Err(StorageError::Corrupt(_))
7832 ));
7833 }
7834
7835 #[test]
7836 fn deserialize_rejects_trailing_garbage() {
7837 let cat = Catalog::new();
7838 let mut bytes = cat.serialize();
7839 bytes.push(0xFF);
7840 assert!(matches!(
7841 Catalog::deserialize(&bytes),
7842 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7843 ));
7844 }
7845
7846 fn populated_users() -> Catalog {
7849 let mut cat = Catalog::new();
7850 cat.create_table(make_users_schema()).unwrap();
7851 let t = cat.get_mut("users").unwrap();
7852 for (id, name, score) in [
7853 (1, "alice", Some(90.0)),
7854 (2, "bob", None),
7855 (3, "alice", Some(70.0)), ] {
7857 t.insert(Row::new(vec![
7858 Value::Int(id),
7859 Value::Text(name.into()),
7860 score.map_or(Value::Null, Value::Float),
7861 ]))
7862 .unwrap();
7863 }
7864 cat
7865 }
7866
7867 #[test]
7868 fn add_index_builds_from_existing_rows() {
7869 let mut cat = populated_users();
7870 cat.get_mut("users")
7871 .unwrap()
7872 .add_index("by_id".into(), "id")
7873 .unwrap();
7874 let t = cat.get("users").unwrap();
7875 let idx = t.index_on(0).expect("index_on(0)");
7876 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7877 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7878 }
7879
7880 #[test]
7881 fn add_index_dup_name_rejected() {
7882 let mut cat = populated_users();
7883 let t = cat.get_mut("users").unwrap();
7884 t.add_index("ix".into(), "id").unwrap();
7885 let err = t.add_index("ix".into(), "name").unwrap_err();
7886 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7887 }
7888
7889 #[test]
7890 fn add_index_unknown_column_rejected() {
7891 let mut cat = populated_users();
7892 let err = cat
7893 .get_mut("users")
7894 .unwrap()
7895 .add_index("ix".into(), "ghost")
7896 .unwrap_err();
7897 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7898 }
7899
7900 #[test]
7901 fn insert_after_create_index_updates_it() {
7902 let mut cat = populated_users();
7903 let t = cat.get_mut("users").unwrap();
7904 t.add_index("by_name".into(), "name").unwrap();
7905 t.insert(Row::new(vec![
7906 Value::Int(4),
7907 Value::Text("dave".into()),
7908 Value::Null,
7909 ]))
7910 .unwrap();
7911 let idx = t.index_on(1).unwrap();
7912 assert_eq!(
7913 idx.lookup_eq(&IndexKey::Text("dave".into())),
7914 &[RowLocator::Hot(3)]
7915 );
7916 assert_eq!(
7918 idx.lookup_eq(&IndexKey::Text("alice".into())),
7919 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7920 );
7921 }
7922
7923 #[test]
7924 fn null_or_float_values_are_not_indexed() {
7925 let mut cat = populated_users();
7926 let t = cat.get_mut("users").unwrap();
7927 t.add_index("by_score".into(), "score").unwrap();
7928 let idx = t.index_on(2).unwrap();
7929 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7934 }
7935
7936 #[test]
7939 fn vector_value_data_type_carries_dim() {
7940 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7941 assert_eq!(
7942 v.data_type(),
7943 Some(DataType::Vector {
7944 dim: 3,
7945 encoding: VecEncoding::F32
7946 })
7947 );
7948 }
7949
7950 #[test]
7951 fn vector_column_insert_matching_dim_ok() {
7952 let mut cat = Catalog::new();
7953 cat.create_table(TableSchema::new(
7954 "emb",
7955 vec![ColumnSchema::new(
7956 "v",
7957 DataType::Vector {
7958 dim: 3,
7959 encoding: VecEncoding::F32,
7960 },
7961 false,
7962 )],
7963 ))
7964 .unwrap();
7965 cat.get_mut("emb")
7966 .unwrap()
7967 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7968 .unwrap();
7969 }
7970
7971 #[test]
7972 fn vector_column_insert_dim_mismatch_rejected() {
7973 let mut cat = Catalog::new();
7974 cat.create_table(TableSchema::new(
7975 "emb",
7976 vec![ColumnSchema::new(
7977 "v",
7978 DataType::Vector {
7979 dim: 3,
7980 encoding: VecEncoding::F32,
7981 },
7982 false,
7983 )],
7984 ))
7985 .unwrap();
7986 let err = cat
7987 .get_mut("emb")
7988 .unwrap()
7989 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7990 .unwrap_err();
7991 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7992 }
7993
7994 #[test]
7995 fn vector_value_survives_catalog_round_trip() {
7996 let mut cat = Catalog::new();
7997 cat.create_table(TableSchema::new(
7998 "emb",
7999 vec![
8000 ColumnSchema::new("id", DataType::Int, false),
8001 ColumnSchema::new(
8002 "v",
8003 DataType::Vector {
8004 dim: 4,
8005 encoding: VecEncoding::F32,
8006 },
8007 false,
8008 ),
8009 ],
8010 ))
8011 .unwrap();
8012 cat.get_mut("emb")
8013 .unwrap()
8014 .insert(Row::new(vec![
8015 Value::Int(1),
8016 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
8017 ]))
8018 .unwrap();
8019 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
8020 let table = restored.get("emb").unwrap();
8021 assert_eq!(
8022 table.schema().columns[1].ty,
8023 DataType::Vector {
8024 dim: 4,
8025 encoding: VecEncoding::F32
8026 }
8027 );
8028 assert_eq!(
8029 table.rows()[0].values[1],
8030 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
8031 );
8032 }
8033
8034 #[test]
8035 fn index_survives_serialize_deserialize_round_trip() {
8036 let mut cat = populated_users();
8037 cat.get_mut("users")
8038 .unwrap()
8039 .add_index("by_name".into(), "name")
8040 .unwrap();
8041 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8042 let idx = restored
8043 .get("users")
8044 .unwrap()
8045 .index_on(1)
8046 .expect("index_on(1) after restore");
8047 assert_eq!(idx.name, "by_name");
8048 assert_eq!(
8050 idx.lookup_eq(&IndexKey::Text("alice".into())),
8051 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8052 );
8053 }
8054
8055 fn bigint_pk_users_schema() -> TableSchema {
8060 TableSchema::new(
8061 "users",
8062 vec![
8063 ColumnSchema::new("id", DataType::BigInt, false),
8064 ColumnSchema::new("name", DataType::Text, false),
8065 ],
8066 )
8067 }
8068
8069 fn make_user_row(id: i64, name: &str) -> Row {
8070 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
8071 }
8072
8073 #[test]
8074 fn lookup_by_pk_finds_row_via_hot_index() {
8075 let mut cat = Catalog::new();
8076 cat.create_table(bigint_pk_users_schema()).unwrap();
8077 let t = cat.get_mut("users").unwrap();
8078 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8079 t.insert(make_user_row(id, name)).unwrap();
8080 }
8081 t.add_index("by_id".into(), "id").unwrap();
8082 let got = cat
8084 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8085 .unwrap();
8086 assert_eq!(got, make_user_row(2, "bob"));
8087 assert_eq!(cat.cold_segment_count(), 0);
8088 }
8089
8090 #[test]
8091 fn lookup_by_pk_returns_none_when_key_missing() {
8092 let mut cat = Catalog::new();
8093 cat.create_table(bigint_pk_users_schema()).unwrap();
8094 let t = cat.get_mut("users").unwrap();
8095 t.insert(make_user_row(1, "alice")).unwrap();
8096 t.add_index("by_id".into(), "id").unwrap();
8097 assert!(
8098 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8099 .is_none()
8100 );
8101 assert!(
8103 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
8104 .is_none()
8105 );
8106 assert!(
8107 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
8108 .is_none()
8109 );
8110 }
8111
8112 #[test]
8113 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
8114 let mut cat = Catalog::new();
8118 cat.create_table(bigint_pk_users_schema()).unwrap();
8119 let t = cat.get_mut("users").unwrap();
8120 t.add_index("by_id".into(), "id").unwrap();
8121 let schema = t.schema.clone();
8122
8123 let cold_rows: Vec<(i64, &str)> =
8124 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
8125 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8126 .iter()
8127 .map(|(id, name)| {
8128 let row = make_user_row(*id, name);
8129 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8130 })
8131 .collect();
8132 let (seg_bytes, _meta) =
8133 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8134 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8135 assert_eq!(seg_id, 0);
8136 assert_eq!(cat.cold_segment_count(), 1);
8137
8138 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8139 .iter()
8140 .map(|(id, _)| {
8141 (
8142 IndexKey::Int(*id),
8143 RowLocator::Cold {
8144 segment_id: seg_id,
8145 page_offset: 0,
8146 },
8147 )
8148 })
8149 .collect();
8150 let registered = cat
8151 .get_mut("users")
8152 .unwrap()
8153 .register_cold_locators("by_id", pairs)
8154 .unwrap();
8155 assert_eq!(registered, 4);
8156
8157 for (id, name) in &cold_rows {
8158 let got = cat
8159 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8160 .unwrap_or_else(|| panic!("cold key {id} not found"));
8161 assert_eq!(got, make_user_row(*id, name));
8162 }
8163 assert!(
8165 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8166 .is_none()
8167 );
8168 }
8169
8170 #[test]
8171 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
8172 let mut cat = Catalog::new();
8176 cat.create_table(bigint_pk_users_schema()).unwrap();
8177 let t = cat.get_mut("users").unwrap();
8178 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8179 t.insert(make_user_row(id, name)).unwrap();
8180 }
8181 t.add_index("by_id".into(), "id").unwrap();
8182 let schema = t.schema.clone();
8183
8184 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
8185 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8186 .iter()
8187 .map(|(id, name)| {
8188 let row = make_user_row(*id, name);
8189 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8190 })
8191 .collect();
8192 let (seg_bytes, _) =
8193 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8194 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8195 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8196 .iter()
8197 .map(|(id, _)| {
8198 (
8199 IndexKey::Int(*id),
8200 RowLocator::Cold {
8201 segment_id: seg_id,
8202 page_offset: 0,
8203 },
8204 )
8205 })
8206 .collect();
8207 cat.get_mut("users")
8208 .unwrap()
8209 .register_cold_locators("by_id", pairs)
8210 .unwrap();
8211
8212 assert_eq!(
8214 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8215 .unwrap(),
8216 make_user_row(1, "alice")
8217 );
8218 assert_eq!(
8219 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8220 .unwrap(),
8221 make_user_row(2, "bob")
8222 );
8223 assert_eq!(
8225 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
8226 .unwrap(),
8227 make_user_row(100, "ivy")
8228 );
8229 assert_eq!(
8230 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
8231 .unwrap(),
8232 make_user_row(200, "joe")
8233 );
8234 assert!(
8236 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
8237 .is_none()
8238 );
8239 }
8240
8241 #[test]
8242 fn register_cold_locators_rejects_nsw_index() {
8243 let mut cat = Catalog::new();
8244 cat.create_table(TableSchema::new(
8245 "vecs",
8246 vec![
8247 ColumnSchema::new("id", DataType::Int, false),
8248 ColumnSchema::new(
8249 "v",
8250 DataType::Vector {
8251 dim: 4,
8252 encoding: VecEncoding::F32,
8253 },
8254 false,
8255 ),
8256 ],
8257 ))
8258 .unwrap();
8259 let t = cat.get_mut("vecs").unwrap();
8260 t.insert(Row::new(vec![
8261 Value::Int(1),
8262 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
8263 ]))
8264 .unwrap();
8265 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
8266 let err = t
8267 .register_cold_locators(
8268 "by_v",
8269 vec![(
8270 IndexKey::Int(1),
8271 RowLocator::Cold {
8272 segment_id: 0,
8273 page_offset: 0,
8274 },
8275 )],
8276 )
8277 .unwrap_err();
8278 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
8281 }
8282
8283 #[test]
8284 fn load_segment_bytes_rejects_garbage() {
8285 let mut cat = Catalog::new();
8286 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
8287 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
8288 assert_eq!(cat.cold_segment_count(), 0);
8290 }
8291
8292 #[test]
8293 fn load_segment_bytes_returns_sequential_ids() {
8294 let mut cat = Catalog::new();
8295 cat.create_table(bigint_pk_users_schema()).unwrap();
8296 let schema = cat.get("users").unwrap().schema.clone();
8297 for batch in 0u32..3 {
8298 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
8299 .map(|i| {
8300 let id = u64::from(batch) * 100 + i;
8301 let row = make_user_row(id.cast_signed(), "x");
8302 (id, encode_row_body_dense(&row, &schema))
8303 })
8304 .collect();
8305 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8306 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
8307 }
8308 assert_eq!(cat.cold_segment_count(), 3);
8309 }
8310
8311 #[test]
8318 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
8319 let mut cat = populated_users();
8326 cat.get_mut("users")
8327 .unwrap()
8328 .add_index("by_name".into(), "name")
8329 .unwrap();
8330
8331 let v8_bytes = encode_as_v8(&cat);
8336 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
8337
8338 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
8339 let idx = restored
8340 .get("users")
8341 .unwrap()
8342 .index_on(1)
8343 .expect("index_on(1) after restore");
8344 assert_eq!(
8347 idx.lookup_eq(&IndexKey::Text("alice".into())),
8348 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8349 );
8350 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
8352 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
8353 }
8354 }
8355
8356 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
8361 let mut out = Vec::with_capacity(64);
8362 out.extend_from_slice(FILE_MAGIC);
8363 out.push(8u8);
8364 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
8365 for t in &cat.tables {
8366 write_str(&mut out, &t.schema.name);
8367 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
8368 for c in &t.schema.columns {
8369 write_str(&mut out, &c.name);
8370 write_data_type(&mut out, c.ty);
8371 out.push(u8::from(c.nullable));
8372 match &c.default {
8373 None => out.push(0),
8374 Some(v) => {
8375 out.push(1);
8376 write_value(&mut out, v);
8377 }
8378 }
8379 out.push(u8::from(c.auto_increment));
8380 }
8381 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
8382 for row in &t.rows {
8383 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8384 }
8385 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8386 for idx in &t.indices {
8387 write_str(&mut out, &idx.name);
8388 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8389 match &idx.kind {
8390 IndexKind::BTree(_) => out.push(0),
8393 IndexKind::Nsw(g) => {
8394 out.push(1);
8395 write_u16(&mut out, u16::try_from(g.m).unwrap());
8396 write_nsw_graph(&mut out, g);
8397 }
8398 IndexKind::Brin { .. } => panic!(
8401 "v8 catalog writer cannot serialise BRIN — \
8402 tests with BRIN indices must use the current writer"
8403 ),
8404 IndexKind::Gin(_) => panic!(
8405 "v8 catalog writer cannot serialise GIN — \
8406 tests with GIN indices must use the current writer"
8407 ),
8408 IndexKind::GinTrgm(_) => panic!(
8409 "v8 catalog writer cannot serialise trigram-GIN — \
8410 tests with trgm indices must use the current writer"
8411 ),
8412 }
8413 }
8414 }
8415 out
8416 }
8417
8418 #[test]
8424 fn v9_catalog_round_trip_preserves_cold_locators() {
8425 let mut cat = Catalog::new();
8426 cat.create_table(bigint_pk_users_schema()).unwrap();
8427 let t = cat.get_mut("users").unwrap();
8428 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8430 t.insert(make_user_row(id, name)).unwrap();
8431 }
8432 t.add_index("by_id".into(), "id").unwrap();
8433 let schema = t.schema.clone();
8434
8435 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8437 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8438 .iter()
8439 .map(|(id, name)| {
8440 let row = make_user_row(*id, name);
8441 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8442 })
8443 .collect();
8444 let (seg_bytes, _) =
8445 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8446 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8447 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8448 .iter()
8449 .map(|(id, _)| {
8450 (
8451 IndexKey::Int(*id),
8452 RowLocator::Cold {
8453 segment_id: seg_id,
8454 page_offset: 0,
8455 },
8456 )
8457 })
8458 .collect();
8459 cat.get_mut("users")
8460 .unwrap()
8461 .register_cold_locators("by_id", pairs)
8462 .unwrap();
8463
8464 let bytes = cat.serialize();
8466 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8467 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8468
8469 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8476 assert_eq!(restored_seg_id, seg_id);
8477
8478 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8479 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8481 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8482 for (id, _) in &cold_rows {
8484 assert_eq!(
8485 idx.lookup_eq(&IndexKey::Int(*id)),
8486 &[RowLocator::Cold {
8487 segment_id: seg_id,
8488 page_offset: 0,
8489 }]
8490 );
8491 }
8492 assert_eq!(
8494 restored
8495 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8496 .unwrap(),
8497 make_user_row(2, "bob")
8498 );
8499 for (id, name) in &cold_rows {
8500 assert_eq!(
8501 restored
8502 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8503 .unwrap(),
8504 make_user_row(*id, name)
8505 );
8506 }
8507 }
8508
8509 #[test]
8516 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8517 let schema = TableSchema::new(
8518 "wide",
8519 vec![
8520 ColumnSchema::new("a", DataType::SmallInt, true),
8521 ColumnSchema::new("b", DataType::Int, false),
8522 ColumnSchema::new("c", DataType::BigInt, false),
8523 ColumnSchema::new("d", DataType::Float, false),
8524 ColumnSchema::new("e", DataType::Bool, false),
8525 ColumnSchema::new("f", DataType::Text, false),
8526 ColumnSchema::new(
8527 "g",
8528 DataType::Vector {
8529 dim: 3,
8530 encoding: VecEncoding::F32,
8531 },
8532 false,
8533 ),
8534 ColumnSchema::new(
8535 "h",
8536 DataType::Numeric {
8537 precision: 18,
8538 scale: 2,
8539 },
8540 false,
8541 ),
8542 ColumnSchema::new("i", DataType::Date, false),
8543 ColumnSchema::new("j", DataType::Timestamp, false),
8544 ],
8545 );
8546 let cases: &[Row] = &[
8547 Row::new(vec![
8548 Value::SmallInt(7),
8549 Value::Int(42),
8550 Value::BigInt(1_000_000),
8551 Value::Float(1.5),
8552 Value::Bool(true),
8553 Value::Text("hello".into()),
8554 Value::Vector(vec![1.0, 2.0, 3.0]),
8555 Value::Numeric {
8556 scaled: 12345,
8557 scale: 2,
8558 },
8559 Value::Date(20_000),
8560 Value::Timestamp(1_700_000_000_000_000),
8561 ]),
8562 Row::new(vec![
8564 Value::Null,
8565 Value::Int(0),
8566 Value::BigInt(0),
8567 Value::Float(0.0),
8568 Value::Bool(false),
8569 Value::Text(String::new()),
8570 Value::Vector(vec![]),
8571 Value::Numeric {
8572 scaled: 0,
8573 scale: 2,
8574 },
8575 Value::Date(0),
8576 Value::Timestamp(0),
8577 ]),
8578 Row::new(vec![
8579 Value::SmallInt(-1),
8580 Value::Int(-1),
8581 Value::BigInt(-1),
8582 Value::Float(-0.5),
8583 Value::Bool(true),
8584 Value::Text("a much longer payload here".into()),
8585 Value::Vector(vec![0.1, 0.2, 0.3]),
8586 Value::Numeric {
8587 scaled: -999_999_999,
8588 scale: 2,
8589 },
8590 Value::Date(-1),
8591 Value::Timestamp(-1),
8592 ]),
8593 ];
8594 for row in cases {
8595 let actual = encode_row_body_dense(row, &schema).len();
8596 let fast = row_body_encoded_len(row, &schema);
8597 assert_eq!(actual, fast, "row {row:?}");
8598 }
8599 }
8600
8601 #[test]
8602 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8603 let mut cat = Catalog::new();
8604 cat.create_table(bigint_pk_users_schema()).unwrap();
8605 let t = cat.get_mut("users").unwrap();
8606 assert_eq!(t.hot_bytes(), 0);
8607 let mut expected: u64 = 0;
8608 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8609 let row = make_user_row(id, name);
8610 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8611 t.insert(row).unwrap();
8612 }
8613 assert_eq!(t.hot_bytes(), expected);
8614 assert_eq!(cat.hot_tier_bytes(), expected);
8615 }
8616
8617 #[test]
8618 fn hot_bytes_shrinks_on_delete() {
8619 let mut cat = Catalog::new();
8620 cat.create_table(bigint_pk_users_schema()).unwrap();
8621 let t = cat.get_mut("users").unwrap();
8622 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8623 t.insert(make_user_row(id, name)).unwrap();
8624 }
8625 let before = t.hot_bytes();
8626 let bob_row = make_user_row(2, "bob");
8628 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8629 let removed = t.delete_rows(&[1]);
8630 assert_eq!(removed, 1);
8631 assert_eq!(t.hot_bytes(), before - bob_bytes);
8632 }
8633
8634 #[test]
8635 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8636 let mut cat = Catalog::new();
8637 cat.create_table(bigint_pk_users_schema()).unwrap();
8638 let t = cat.get_mut("users").unwrap();
8639 t.insert(make_user_row(1, "alice")).unwrap();
8640 let after_insert = t.hot_bytes();
8641 let new_row = make_user_row(1, "alice-the-longer-name");
8644 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8645 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8646 t.update_row(0, new_row.values).unwrap();
8647 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8648 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8649 }
8650
8651 #[test]
8652 fn hot_bytes_round_trips_through_serialize_deserialize() {
8653 let mut cat = Catalog::new();
8654 cat.create_table(bigint_pk_users_schema()).unwrap();
8655 let t = cat.get_mut("users").unwrap();
8656 for i in 0..10 {
8657 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8658 .unwrap();
8659 }
8660 let pre = cat.hot_tier_bytes();
8661 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8662 assert_eq!(restored.hot_tier_bytes(), pre);
8663 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8664 }
8665
8666 #[test]
8673 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8674 let mut cat = Catalog::new();
8675 cat.create_table(bigint_pk_users_schema()).unwrap();
8676 let t = cat.get_mut("users").unwrap();
8677 for id in 0..10i64 {
8678 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8679 .unwrap();
8680 }
8681 t.add_index("by_id".into(), "id").unwrap();
8682 let total_bytes_before = t.hot_bytes();
8683
8684 let report = cat
8685 .freeze_oldest_to_cold("users", "by_id", 6)
8686 .expect("freeze succeeds");
8687 assert_eq!(report.frozen_rows, 6);
8688 assert_eq!(report.segment_id, 0);
8689 assert!(report.bytes_freed > 0);
8690 assert!(!report.segment_bytes.is_empty());
8691
8692 let t = cat.get("users").unwrap();
8693 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8694 assert_eq!(cat.cold_segment_count(), 1);
8695 assert_eq!(
8697 t.hot_bytes(),
8698 total_bytes_before - report.bytes_freed,
8699 "hot_bytes accounting matches FreezeReport"
8700 );
8701
8702 for id in 0..10i64 {
8705 let got = cat
8706 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8707 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8708 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8709 }
8710 }
8711
8712 #[test]
8717 fn freeze_twice_preserves_prior_cold_locators() {
8718 let mut cat = Catalog::new();
8719 cat.create_table(bigint_pk_users_schema()).unwrap();
8720 let t = cat.get_mut("users").unwrap();
8721 for id in 0..12i64 {
8722 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8723 .unwrap();
8724 }
8725 t.add_index("by_id".into(), "id").unwrap();
8726
8727 cat.freeze_oldest_to_cold("users", "by_id", 4)
8728 .expect("first freeze ok");
8729 cat.freeze_oldest_to_cold("users", "by_id", 4)
8730 .expect("second freeze ok");
8731
8732 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8733 assert_eq!(cat.cold_segment_count(), 2);
8734 for id in 0..12i64 {
8737 let got = cat
8738 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8739 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8740 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8741 }
8742 }
8743
8744 #[test]
8747 fn freeze_oldest_to_cold_rejects_invalid_input() {
8748 let mut cat = Catalog::new();
8749 cat.create_table(bigint_pk_users_schema()).unwrap();
8750 let t = cat.get_mut("users").unwrap();
8751 for id in 0..3i64 {
8752 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8753 .unwrap();
8754 }
8755 t.add_index("by_id".into(), "id").unwrap();
8756
8757 assert!(matches!(
8759 cat.freeze_oldest_to_cold("users", "by_id", 0),
8760 Err(StorageError::Corrupt(_))
8761 ));
8762 assert!(matches!(
8764 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8765 Err(StorageError::Corrupt(_))
8766 ));
8767 assert!(matches!(
8769 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8770 Err(StorageError::Corrupt(_))
8771 ));
8772 assert!(matches!(
8774 cat.freeze_oldest_to_cold("users", "by_id", 999),
8775 Err(StorageError::Corrupt(_))
8776 ));
8777 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8779 assert_eq!(cat.cold_segment_count(), 0);
8780 }
8781
8782 #[test]
8785 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8786 let mut cat = Catalog::new();
8787 cat.create_table(TableSchema::new(
8788 "by_name",
8789 vec![
8790 ColumnSchema::new("name", DataType::Text, false),
8791 ColumnSchema::new("payload", DataType::BigInt, false),
8792 ],
8793 ))
8794 .unwrap();
8795 let t = cat.get_mut("by_name").unwrap();
8796 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8797 .unwrap();
8798 t.add_index("by_n".into(), "name").unwrap();
8799 let err = cat
8800 .freeze_oldest_to_cold("by_name", "by_n", 1)
8801 .expect_err("non-integer PK rejected");
8802 match err {
8803 StorageError::Corrupt(s) => assert!(
8804 s.contains("non-integer"),
8805 "error message names the constraint: {s}"
8806 ),
8807 other => panic!("expected Corrupt, got {other:?}"),
8808 }
8809 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8811 assert_eq!(cat.cold_segment_count(), 0);
8812 }
8813
8814 #[test]
8819 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8820 let mut cat = Catalog::new();
8821 cat.create_table(bigint_pk_users_schema()).unwrap();
8822 let t = cat.get_mut("users").unwrap();
8823 for id in 0..6i64 {
8824 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8825 .unwrap();
8826 }
8827 t.add_index("by_id".into(), "id").unwrap();
8828 t.add_index("by_name".into(), "name").unwrap();
8829
8830 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8831
8832 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8836 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8837 assert_eq!(got.len(), 1);
8838 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8839 match got[0] {
8840 RowLocator::Hot(i) => {
8841 assert_eq!(i, 1);
8844 }
8845 RowLocator::Cold { .. } => unreachable!(),
8846 }
8847 }
8848
8849 #[test]
8857 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8858 let mut cat = Catalog::new();
8859 cat.create_table(bigint_pk_users_schema()).unwrap();
8860 let t = cat.get_mut("users").unwrap();
8861 for id in 0..6i64 {
8862 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8863 .unwrap();
8864 }
8865 t.add_index("by_id".into(), "id").unwrap();
8866 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8869 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8870
8871 let new_idx = cat
8873 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8874 .expect("promote ok")
8875 .expect("PK 2 was cold");
8876 assert_eq!(
8877 new_idx, 2,
8878 "promoted row appended after the 2 surviving hot rows"
8879 );
8880
8881 let t = cat.get("users").unwrap();
8882 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8883 let row = make_user_row(2, "u-2");
8885 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8886 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8887
8888 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8891 assert_eq!(entries.len(), 1, "exactly one locator per key");
8892 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8893 assert_eq!(
8895 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8896 .unwrap(),
8897 row
8898 );
8899 assert_eq!(
8902 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8903 .unwrap(),
8904 make_user_row(0, "u-0")
8905 );
8906 }
8907
8908 #[test]
8912 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8913 let mut cat = Catalog::new();
8914 cat.create_table(bigint_pk_users_schema()).unwrap();
8915 let t = cat.get_mut("users").unwrap();
8916 t.insert(make_user_row(7, "alice")).unwrap();
8917 t.add_index("by_id".into(), "id").unwrap();
8918
8919 assert!(
8921 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8922 .unwrap()
8923 .is_none()
8924 );
8925 assert!(
8927 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8928 .unwrap()
8929 .is_none()
8930 );
8931 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8933 assert_eq!(cat.cold_segment_count(), 0);
8934 }
8935
8936 #[test]
8941 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8942 let mut cat = Catalog::new();
8943 cat.create_table(bigint_pk_users_schema()).unwrap();
8944 let t = cat.get_mut("users").unwrap();
8945 for id in 0..5i64 {
8946 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8947 .unwrap();
8948 }
8949 t.add_index("by_id".into(), "id").unwrap();
8950 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8951
8952 assert!(
8954 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8955 .is_some(),
8956 "frozen PK resolves before shadow"
8957 );
8958 let removed = cat
8959 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8960 .unwrap();
8961 assert_eq!(removed, 1, "exactly one cold locator retired");
8962
8963 assert!(
8966 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8967 .is_none(),
8968 "shadowed key no longer resolves"
8969 );
8970 assert_eq!(
8972 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8973 .unwrap(),
8974 make_user_row(0, "u-0")
8975 );
8976 assert_eq!(
8977 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8978 .unwrap(),
8979 make_user_row(2, "u-2")
8980 );
8981 }
8982
8983 #[test]
8988 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8989 let mut cat = Catalog::new();
8990 cat.create_table(bigint_pk_users_schema()).unwrap();
8991 let t = cat.get_mut("users").unwrap();
8992 t.insert(make_user_row(1, "alice")).unwrap();
8993 t.add_index("by_id".into(), "id").unwrap();
8994 assert_eq!(
8995 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8996 .unwrap(),
8997 0,
8998 "hot-only key drops no cold locators"
8999 );
9000 assert_eq!(
9001 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
9002 .unwrap(),
9003 0,
9004 "absent key drops no cold locators"
9005 );
9006 assert_eq!(cat.get("users").unwrap().row_count(), 1);
9007 }
9008
9009 #[test]
9011 fn promote_and_shadow_reject_invalid_inputs() {
9012 let mut cat = Catalog::new();
9013 cat.create_table(bigint_pk_users_schema()).unwrap();
9014 let t = cat.get_mut("users").unwrap();
9015 t.insert(make_user_row(1, "alice")).unwrap();
9016 t.add_index("by_id".into(), "id").unwrap();
9017
9018 assert!(matches!(
9020 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
9021 Err(StorageError::Corrupt(_))
9022 ));
9023 assert!(matches!(
9024 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
9025 Err(StorageError::Corrupt(_))
9026 ));
9027 assert!(matches!(
9029 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9030 Err(StorageError::Corrupt(_))
9031 ));
9032 assert!(matches!(
9033 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9034 Err(StorageError::Corrupt(_))
9035 ));
9036 }
9037
9038 #[test]
9045 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
9046 let mut a = Catalog::new();
9047 let mut b = Catalog::new();
9048 for cat in [&mut a, &mut b] {
9049 cat.create_table(bigint_pk_users_schema()).unwrap();
9050 let t = cat.get_mut("users").unwrap();
9051 for id in 0..10i64 {
9052 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9053 .unwrap();
9054 }
9055 t.add_index("by_id".into(), "id").unwrap();
9056 }
9057 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
9058 let slice = b
9059 .prepare_freeze_slice("users", "by_id", 0..6)
9060 .expect("prepare");
9061 let parallel = b
9062 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
9063 .expect("commit");
9064 assert_eq!(single.segment_id, parallel.segment_id);
9065 assert_eq!(single.frozen_rows, parallel.frozen_rows);
9066 assert_eq!(single.bytes_freed, parallel.bytes_freed);
9067 assert_eq!(single.segment_bytes, parallel.segment_bytes);
9068 for id in 0..10i64 {
9070 assert_eq!(
9071 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9072 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9073 "PK {id} differs after single vs slice freeze"
9074 );
9075 }
9076 }
9077
9078 #[test]
9083 fn commit_freeze_slices_two_slices_match_single_slice() {
9084 let mut a = Catalog::new();
9085 let mut b = Catalog::new();
9086 for cat in [&mut a, &mut b] {
9087 cat.create_table(bigint_pk_users_schema()).unwrap();
9088 let t = cat.get_mut("users").unwrap();
9089 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
9092 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
9093 .unwrap();
9094 }
9095 t.add_index("by_id".into(), "id").unwrap();
9096 }
9097 let single = a
9098 .prepare_freeze_slice("users", "by_id", 0..8)
9099 .expect("prepare");
9100 let one = a
9101 .commit_freeze_slices("users", "by_id", alloc::vec![single])
9102 .expect("commit one");
9103 let s1 = b
9104 .prepare_freeze_slice("users", "by_id", 0..4)
9105 .expect("prepare s1");
9106 let s2 = b
9107 .prepare_freeze_slice("users", "by_id", 4..8)
9108 .expect("prepare s2");
9109 let two = b
9110 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
9111 .expect("commit two");
9112 assert_eq!(one.segment_bytes, two.segment_bytes);
9113 assert_eq!(one.frozen_rows, two.frozen_rows);
9114 for id in 0..10i64 {
9117 assert_eq!(
9118 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9119 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9120 "PK {id} differs after one-slice vs two-slice freeze"
9121 );
9122 }
9123 }
9124
9125 #[test]
9127 fn commit_freeze_slices_rejects_gap() {
9128 let mut cat = Catalog::new();
9129 cat.create_table(bigint_pk_users_schema()).unwrap();
9130 let t = cat.get_mut("users").unwrap();
9131 for id in 0..6i64 {
9132 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9133 .unwrap();
9134 }
9135 t.add_index("by_id".into(), "id").unwrap();
9136 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
9137 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
9138 assert!(matches!(
9139 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
9140 Err(StorageError::Corrupt(_))
9141 ));
9142 assert_eq!(cat.cold_segment_count(), 0);
9144 assert_eq!(cat.get("users").unwrap().row_count(), 6);
9145 }
9146
9147 #[test]
9149 fn commit_freeze_slices_empty_is_noop() {
9150 let mut cat = Catalog::new();
9151 cat.create_table(bigint_pk_users_schema()).unwrap();
9152 let t = cat.get_mut("users").unwrap();
9153 for id in 0..3i64 {
9154 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9155 .unwrap();
9156 }
9157 t.add_index("by_id".into(), "id").unwrap();
9158 let report = cat
9159 .commit_freeze_slices("users", "by_id", Vec::new())
9160 .unwrap();
9161 assert_eq!(report.frozen_rows, 0);
9162 assert_eq!(cat.cold_segment_count(), 0);
9163 assert_eq!(cat.get("users").unwrap().row_count(), 3);
9164 }
9165
9166 #[test]
9173 fn compact_merges_small_segments_storage_unit() {
9174 let mut cat = Catalog::new();
9175 cat.create_table(bigint_pk_users_schema()).unwrap();
9176 let t = cat.get_mut("users").unwrap();
9177 for id in 0..8i64 {
9178 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9179 .unwrap();
9180 }
9181 t.add_index("by_id".into(), "id").unwrap();
9182 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9184 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9185 assert_eq!(cat.cold_segment_count(), 2);
9186 assert_eq!(cat.cold_segment_slot_count(), 2);
9187
9188 let max_seg_bytes = cat
9191 .cold_segment_ids_global()
9192 .iter()
9193 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9194 .max()
9195 .unwrap();
9196 let target = max_seg_bytes + 1;
9197
9198 let report = cat
9199 .compact_cold_segments("users", "by_id", target)
9200 .expect("compact succeeds");
9201 assert_eq!(report.sources.len(), 2);
9202 let merged_id = report.merged_segment_id.expect("merge happened");
9203 assert_eq!(report.merged_rows, 6);
9204 assert_eq!(report.deleted_rows_pruned, 0);
9205 assert!(!report.merged_segment_bytes.is_empty());
9206
9207 assert_eq!(cat.cold_segment_count(), 1);
9210 assert_eq!(cat.cold_segment_slot_count(), 3);
9211 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
9212
9213 for id in 0..8i64 {
9216 let got = cat
9217 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9218 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
9219 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9220 }
9221 }
9222
9223 #[test]
9227 fn compact_drops_shadowed_cold_rows() {
9228 let mut cat = Catalog::new();
9229 cat.create_table(bigint_pk_users_schema()).unwrap();
9230 let t = cat.get_mut("users").unwrap();
9231 for id in 0..6i64 {
9232 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9233 .unwrap();
9234 }
9235 t.add_index("by_id".into(), "id").unwrap();
9236 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9237 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9238 assert_eq!(
9240 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
9241 .unwrap(),
9242 1
9243 );
9244 assert_eq!(
9245 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
9246 .unwrap(),
9247 1
9248 );
9249
9250 let max_seg_bytes = cat
9251 .cold_segment_ids_global()
9252 .iter()
9253 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9254 .max()
9255 .unwrap();
9256 let report = cat
9257 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9258 .expect("compact succeeds");
9259 assert_eq!(report.sources.len(), 2);
9260 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
9261 assert_eq!(report.deleted_rows_pruned, 2);
9262
9263 for shadowed in [1i64, 4i64] {
9265 assert!(
9266 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
9267 .is_none(),
9268 "shadowed PK {shadowed} must remain invisible after compact"
9269 );
9270 }
9271 for live in [0i64, 2, 3, 5] {
9273 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
9274 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
9275 }
9276 }
9277
9278 #[test]
9281 fn compact_is_noop_below_two_candidates() {
9282 let mut cat = Catalog::new();
9283 cat.create_table(bigint_pk_users_schema()).unwrap();
9284 let t = cat.get_mut("users").unwrap();
9285 for id in 0..6i64 {
9286 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9287 .unwrap();
9288 }
9289 t.add_index("by_id".into(), "id").unwrap();
9290 let report = cat
9292 .compact_cold_segments("users", "by_id", 1 << 30)
9293 .expect("noop ok");
9294 assert!(report.merged_segment_id.is_none());
9295 assert!(report.sources.is_empty());
9296
9297 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
9299 let report = cat
9300 .compact_cold_segments("users", "by_id", 1 << 30)
9301 .expect("noop ok");
9302 assert!(report.merged_segment_id.is_none());
9303 assert_eq!(cat.cold_segment_count(), 1);
9304
9305 let report = cat
9308 .compact_cold_segments("users", "by_id", 1)
9309 .expect("noop ok");
9310 assert!(report.merged_segment_id.is_none());
9311 assert_eq!(cat.cold_segment_count(), 1);
9312 }
9313
9314 #[test]
9322 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
9323 let mut cat = Catalog::new();
9324 cat.create_table(bigint_pk_users_schema()).unwrap();
9325 let t = cat.get_mut("users").unwrap();
9326 for id in 0..6i64 {
9327 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9328 .unwrap();
9329 }
9330 t.add_index("by_id".into(), "id").unwrap();
9331 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9332 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9333 let max_seg_bytes = cat
9334 .cold_segment_ids_global()
9335 .iter()
9336 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9337 .max()
9338 .unwrap();
9339 let report = cat
9340 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9341 .expect("compact ok");
9342 let merged_id = report.merged_segment_id.unwrap();
9343
9344 let cat_bytes = cat.serialize();
9349 let merged_bytes = report.merged_segment_bytes.clone();
9350
9351 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
9352 restored
9353 .load_segment_bytes_at(merged_id, merged_bytes)
9354 .expect("reload merged ok");
9355
9356 for id in 0..6i64 {
9358 let got = restored
9359 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9360 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
9361 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9362 }
9363 assert_eq!(restored.cold_segment_count(), 1);
9366 }
9367
9368 #[test]
9371 fn load_segment_bytes_at_pads_and_rejects_collision() {
9372 let mut cat = Catalog::new();
9373 cat.create_table(bigint_pk_users_schema()).unwrap();
9374 let t = cat.get_mut("users").unwrap();
9375 for id in 0..4i64 {
9376 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9377 .unwrap();
9378 }
9379 t.add_index("by_id".into(), "id").unwrap();
9380 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9381 let bytes_seg0 = report.segment_bytes.clone();
9382
9383 cat.load_segment_bytes_at(5, bytes_seg0.clone())
9387 .expect("pad + load ok");
9388 assert_eq!(cat.cold_segment_slot_count(), 6);
9389 assert_eq!(cat.cold_segment_count(), 2);
9390
9391 assert!(matches!(
9393 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9394 Err(StorageError::Corrupt(_))
9395 ));
9396 assert!(matches!(
9398 cat.load_segment_bytes_at(0, bytes_seg0),
9399 Err(StorageError::Corrupt(_))
9400 ));
9401 }
9402
9403 #[test]
9407 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9408 let mut cat = Catalog::new();
9409 cat.create_table(bigint_pk_users_schema()).unwrap();
9410 let t = cat.get_mut("users").unwrap();
9411 for id in 0..4i64 {
9412 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9413 .unwrap();
9414 }
9415 t.add_index("by_id".into(), "id").unwrap();
9416
9417 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9419 let promoted = cat
9420 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9421 .unwrap();
9422 assert!(promoted.is_some());
9423 let entries_after_promote = cat
9424 .get("users")
9425 .unwrap()
9426 .index_on(0)
9427 .unwrap()
9428 .lookup_eq(&IndexKey::Int(0))
9429 .to_vec();
9430 assert_eq!(entries_after_promote.len(), 1);
9431 assert!(entries_after_promote[0].is_hot());
9432
9433 for id in [2i64, 3] {
9440 assert_eq!(
9441 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9442 .unwrap(),
9443 make_user_row(id, &alloc::format!("u-{id}"))
9444 );
9445 }
9446 }
9447}