1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21pub mod trgm;
22
23pub use self::bloom::{BloomError, BloomFilter};
24pub use self::row_locator::{RowLocator, RowLocatorError};
25pub use self::segment::{
26 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
27 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
28 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
29 wrap_v2_envelope_with_brin,
30};
31
32use alloc::boxed::Box;
33use alloc::collections::{BTreeMap, BTreeSet};
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::sync::Arc;
37use alloc::vec::Vec;
38use core::fmt;
39
40use self::persistent::PersistentVec;
41use self::persistent_btree::PersistentBTreeMap;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
55pub enum VecEncoding {
56 #[default]
57 F32,
58 Sq8,
59 F16,
60}
61
62impl fmt::Display for VecEncoding {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 match self {
65 Self::F32 => f.write_str("F32"),
66 Self::Sq8 => f.write_str("SQ8"),
67 Self::F16 => f.write_str("HALF"),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum DataType {
77 SmallInt,
80 Int, BigInt, Float, Text,
84 Varchar(u32),
87 Char(u32),
91 Bool,
92 Vector {
98 dim: u32,
99 encoding: VecEncoding,
100 },
101 Numeric {
107 precision: u8,
108 scale: u8,
109 },
110 Date,
113 Timestamp,
116 Timestamptz,
124 Interval,
129 Json,
134 Jsonb,
140 Bytes,
148 TextArray,
157 IntArray,
161 BigIntArray,
164 TsVector,
172 TsQuery,
176}
177
178impl fmt::Display for DataType {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 match self {
181 Self::SmallInt => f.write_str("SMALLINT"),
182 Self::Int => f.write_str("INT"),
183 Self::BigInt => f.write_str("BIGINT"),
184 Self::Float => f.write_str("FLOAT"),
185 Self::Text => f.write_str("TEXT"),
186 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
187 Self::Char(n) => write!(f, "CHAR({n})"),
188 Self::Bool => f.write_str("BOOL"),
189 Self::Vector { dim, encoding } => match encoding {
190 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
191 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
192 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
193 },
194 Self::Numeric { precision, scale } => {
195 if *scale == 0 {
196 write!(f, "NUMERIC({precision})")
197 } else {
198 write!(f, "NUMERIC({precision}, {scale})")
199 }
200 }
201 Self::Date => f.write_str("DATE"),
202 Self::Timestamp => f.write_str("TIMESTAMP"),
203 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
204 Self::Interval => f.write_str("INTERVAL"),
205 Self::Json => f.write_str("JSON"),
206 Self::Jsonb => f.write_str("JSONB"),
207 Self::Bytes => f.write_str("BYTEA"),
208 Self::TextArray => f.write_str("TEXT[]"),
209 Self::IntArray => f.write_str("INT[]"),
210 Self::BigIntArray => f.write_str("BIGINT[]"),
211 Self::TsVector => f.write_str("TSVECTOR"),
212 Self::TsQuery => f.write_str("TSQUERY"),
213 }
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct TsLexeme {
224 pub word: String,
225 pub positions: Vec<u16>,
226 pub weight: u8,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum TsQueryAst {
234 Term {
238 word: String,
239 weight_mask: u8,
240 },
241 And(Box<TsQueryAst>, Box<TsQueryAst>),
242 Or(Box<TsQueryAst>, Box<TsQueryAst>),
243 Not(Box<TsQueryAst>),
244 Phrase {
247 left: Box<TsQueryAst>,
248 right: Box<TsQueryAst>,
249 distance: u16,
250 },
251}
252
253#[derive(Debug, Clone, PartialEq)]
257#[non_exhaustive]
258pub enum Value {
259 SmallInt(i16),
260 Int(i32),
261 BigInt(i64),
262 Float(f64),
263 Text(String),
264 Bool(bool),
265 Vector(Vec<f32>),
266 Sq8Vector(crate::quantize::Sq8Vector),
273 HalfVector(crate::halfvec::HalfVector),
279 Numeric {
283 scaled: i128,
284 scale: u8,
285 },
286 Date(i32),
288 Timestamp(i64),
290 Interval {
293 months: i32,
294 micros: i64,
295 },
296 Json(String),
300 Bytes(Vec<u8>),
306 TextArray(Vec<Option<String>>),
312 IntArray(Vec<Option<i32>>),
316 BigIntArray(Vec<Option<i64>>),
319 TsVector(Vec<TsLexeme>),
324 TsQuery(TsQueryAst),
327 Null,
328}
329
330impl Value {
331 pub fn data_type(&self) -> Option<DataType> {
333 match self {
334 Self::SmallInt(_) => Some(DataType::SmallInt),
335 Self::Int(_) => Some(DataType::Int),
336 Self::BigInt(_) => Some(DataType::BigInt),
337 Self::Float(_) => Some(DataType::Float),
338 Self::Text(_) => Some(DataType::Text),
341 Self::Bool(_) => Some(DataType::Bool),
342 Self::Vector(v) => Some(DataType::Vector {
343 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
344 encoding: VecEncoding::F32,
345 }),
346 Self::Sq8Vector(q) => Some(DataType::Vector {
347 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
348 encoding: VecEncoding::Sq8,
349 }),
350 Self::HalfVector(h) => Some(DataType::Vector {
351 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
352 encoding: VecEncoding::F16,
353 }),
354 Self::Numeric { scale, .. } => Some(DataType::Numeric {
359 precision: 0,
360 scale: *scale,
361 }),
362 Self::Date(_) => Some(DataType::Date),
363 Self::Timestamp(_) => Some(DataType::Timestamp),
364 Self::Interval { .. } => Some(DataType::Interval),
365 Self::Json(_) => Some(DataType::Json),
366 Self::Bytes(_) => Some(DataType::Bytes),
367 Self::TextArray(_) => Some(DataType::TextArray),
368 Self::IntArray(_) => Some(DataType::IntArray),
369 Self::BigIntArray(_) => Some(DataType::BigIntArray),
370 Self::TsVector(_) => Some(DataType::TsVector),
371 Self::TsQuery(_) => Some(DataType::TsQuery),
372 Self::Null => None,
373 }
374 }
375
376 pub const fn is_null(&self) -> bool {
377 matches!(self, Self::Null)
378 }
379}
380
381#[derive(Debug, Clone, PartialEq)]
384pub struct Row {
385 pub values: Vec<Value>,
386}
387
388impl Row {
389 pub const fn new(values: Vec<Value>) -> Self {
390 Self { values }
391 }
392
393 pub fn len(&self) -> usize {
394 self.values.len()
395 }
396
397 pub fn is_empty(&self) -> bool {
398 self.values.is_empty()
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
403pub struct ColumnSchema {
404 pub name: String,
405 pub ty: DataType,
406 pub nullable: bool,
407 pub default: Option<Value>,
412 pub runtime_default: Option<String>,
420 pub auto_increment: bool,
424}
425
426#[derive(Debug, Clone, PartialEq)]
427pub struct TableSchema {
428 pub name: String,
429 pub columns: Vec<ColumnSchema>,
430 pub hot_tier_bytes: Option<u64>,
436 pub foreign_keys: Vec<ForeignKeyConstraint>,
443 pub uniqueness_constraints: Vec<UniquenessConstraint>,
450 pub checks: Vec<String>,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct UniquenessConstraint {
467 pub is_primary_key: bool,
472 pub columns: Vec<usize>,
476 pub nulls_not_distinct: bool,
483}
484
485#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct ForeignKeyConstraint {
491 pub name: Option<String>,
495 pub local_columns: Vec<usize>,
498 pub parent_table: String,
500 pub parent_columns: Vec<usize>,
505 pub on_delete: FkAction,
507 pub on_update: FkAction,
510}
511
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum FkAction {
515 Restrict,
516 Cascade,
517 SetNull,
518 SetDefault,
519 NoAction,
520}
521
522impl FkAction {
523 pub const fn tag(self) -> u8 {
525 match self {
526 Self::Restrict => 0,
527 Self::Cascade => 1,
528 Self::SetNull => 2,
529 Self::SetDefault => 3,
530 Self::NoAction => 4,
531 }
532 }
533 pub const fn from_tag(b: u8) -> Option<Self> {
534 Some(match b {
535 0 => Self::Restrict,
536 1 => Self::Cascade,
537 2 => Self::SetNull,
538 3 => Self::SetDefault,
539 4 => Self::NoAction,
540 _ => return None,
541 })
542 }
543}
544
545impl TableSchema {
546 pub fn column_position(&self, name: &str) -> Option<usize> {
547 self.columns.iter().position(|c| c.name == name)
548 }
549}
550
551#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
556pub enum IndexKey {
557 Int(i64),
558 Text(String),
559 Bool(bool),
560}
561
562impl IndexKey {
563 pub fn from_value(v: &Value) -> Option<Self> {
564 match v {
565 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
566 Value::Int(n) => Some(Self::Int(i64::from(*n))),
567 Value::BigInt(n) => Some(Self::Int(*n)),
568 Value::Text(s) => Some(Self::Text(s.clone())),
569 Value::Bool(b) => Some(Self::Bool(*b)),
570 Value::Date(d) => Some(Self::Int(i64::from(*d))),
573 Value::Timestamp(t) => Some(Self::Int(*t)),
574 Value::Null
579 | Value::Float(_)
580 | Value::Vector(_)
581 | Value::Sq8Vector(_)
582 | Value::HalfVector(_)
583 | Value::Numeric { .. }
584 | Value::Interval { .. }
585 | Value::Json(_)
586 | Value::Bytes(_)
587 | Value::TextArray(_)
588 | Value::IntArray(_)
589 | Value::BigIntArray(_)
590 | Value::TsVector(_)
591 | Value::TsQuery(_) => None,
592 }
593 }
594}
595
596#[derive(Debug, Clone)]
601pub struct Index {
602 pub name: String,
603 pub column_position: usize,
604 pub kind: IndexKind,
605 pub included_columns: Vec<usize>,
615 pub partial_predicate: Option<String>,
622 pub expression: Option<String>,
627 pub is_unique: bool,
634 pub extra_column_positions: Vec<usize>,
643}
644
645pub const NSW_DEFAULT_M: usize = 16;
648
649#[derive(Debug, Clone)]
657pub struct FreezeReport {
658 pub segment_id: u32,
661 pub frozen_rows: usize,
664 pub bytes_freed: u64,
668 pub segment_bytes: Vec<u8>,
673}
674
675#[derive(Debug, Clone)]
684pub struct FreezeSlice {
685 pub row_range: core::ops::Range<usize>,
690 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
696}
697
698#[derive(Debug, Clone)]
714pub struct CompactReport {
715 pub sources: Vec<u32>,
717 pub merged_segment_id: Option<u32>,
719 pub merged_segment_bytes: Vec<u8>,
721 pub merged_rows: usize,
723 pub deleted_rows_pruned: usize,
728 pub bytes_reclaimed_estimate: u64,
732}
733
734#[derive(Debug, Clone)]
735pub enum IndexKind {
736 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
753 Nsw(NswGraph),
755 Brin {
762 column_type: DataType,
766 },
767 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
783 GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
794}
795
796#[derive(Debug, Clone)]
805pub struct NswGraph {
806 pub m: usize,
808 pub m_max_0: usize,
811 pub entry: Option<usize>,
814 pub entry_level: u8,
816 pub levels: PersistentVec<u8>,
823 pub layers: Vec<PersistentVec<Vec<u32>>>,
839}
840
841impl NswGraph {
842 fn new(m: usize) -> Self {
843 Self {
844 m,
845 m_max_0: m.saturating_mul(2),
846 entry: None,
847 entry_level: 0,
848 levels: PersistentVec::new(),
849 layers: alloc::vec![PersistentVec::new()],
850 }
851 }
852
853 pub const fn cap_for_layer(&self, layer: u8) -> usize {
855 if layer == 0 { self.m_max_0 } else { self.m }
856 }
857}
858
859#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
866 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
869 x ^= x >> 30;
870 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
871 x ^= x >> 27;
872 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
873 x ^= x >> 31;
874 let mut level: u8 = 0;
879 while x & 0xF == 0 && level < MAX_LEVEL {
880 level += 1;
881 x >>= 4;
882 }
883 level
884}
885
886impl Index {
887 fn new_btree(name: String, column_position: usize) -> Self {
888 Self {
889 name,
890 column_position,
891 kind: IndexKind::BTree(PersistentBTreeMap::new()),
892 included_columns: Vec::new(),
893 partial_predicate: None,
894 expression: None,
895 is_unique: false,
896 extra_column_positions: Vec::new(),
897 }
898 }
899
900 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
901 Self {
902 name,
903 column_position,
904 kind: IndexKind::Nsw(NswGraph::new(m)),
905 included_columns: Vec::new(),
906 partial_predicate: None,
907 expression: None,
908 is_unique: false,
909 extra_column_positions: Vec::new(),
910 }
911 }
912
913 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
917 Self {
918 name,
919 column_position,
920 kind: IndexKind::Brin { column_type },
921 included_columns: Vec::new(),
922 partial_predicate: None,
923 expression: None,
924 is_unique: false,
925 extra_column_positions: Vec::new(),
926 }
927 }
928
929 fn new_gin(name: String, column_position: usize) -> Self {
934 Self {
935 name,
936 column_position,
937 kind: IndexKind::Gin(PersistentBTreeMap::new()),
938 included_columns: Vec::new(),
939 partial_predicate: None,
940 expression: None,
941 is_unique: false,
942 extra_column_positions: Vec::new(),
943 }
944 }
945
946 fn new_gin_trgm(name: String, column_position: usize) -> Self {
951 Self {
952 name,
953 column_position,
954 kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
955 included_columns: Vec::new(),
956 partial_predicate: None,
957 expression: None,
958 is_unique: false,
959 extra_column_positions: Vec::new(),
960 }
961 }
962
963 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
972 match &self.kind {
973 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
974 IndexKind::Nsw(_)
978 | IndexKind::Brin { .. }
979 | IndexKind::Gin(_)
980 | IndexKind::GinTrgm(_) => &[][..],
981 }
982 }
983
984 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
988 match &self.kind {
989 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
990 IndexKind::BTree(_)
991 | IndexKind::Nsw(_)
992 | IndexKind::Brin { .. }
993 | IndexKind::GinTrgm(_) => &[][..],
994 }
995 }
996
997 pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
1002 match &self.kind {
1003 IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
1004 IndexKind::BTree(_)
1005 | IndexKind::Nsw(_)
1006 | IndexKind::Brin { .. }
1007 | IndexKind::Gin(_) => &[][..],
1008 }
1009 }
1010
1011 pub const fn nsw(&self) -> Option<&NswGraph> {
1014 match &self.kind {
1015 IndexKind::Nsw(g) => Some(g),
1016 IndexKind::BTree(_)
1017 | IndexKind::Brin { .. }
1018 | IndexKind::Gin(_)
1019 | IndexKind::GinTrgm(_) => None,
1020 }
1021 }
1022
1023 pub const fn is_brin(&self) -> bool {
1028 matches!(self.kind, IndexKind::Brin { .. })
1029 }
1030
1031 pub const fn is_gin_trgm(&self) -> bool {
1035 matches!(self.kind, IndexKind::GinTrgm(_))
1036 }
1037
1038 pub const fn is_gin(&self) -> bool {
1042 matches!(self.kind, IndexKind::Gin(_))
1043 }
1044}
1045
1046#[derive(Debug, Clone)]
1062pub struct Table {
1063 schema: TableSchema,
1064 rows: PersistentVec<Row>,
1065 indices: Vec<Index>,
1066 hot_bytes: u64,
1067 cold_row_count: u64,
1081 cold_row_count_stale: bool,
1086}
1087
1088impl Table {
1089 pub fn new(schema: TableSchema) -> Self {
1090 Self {
1091 schema,
1092 rows: PersistentVec::new(),
1093 indices: Vec::new(),
1094 hot_bytes: 0,
1095 cold_row_count: 0,
1096 cold_row_count_stale: false,
1097 }
1098 }
1099
1100 #[must_use]
1104 pub const fn hot_bytes(&self) -> u64 {
1105 self.hot_bytes
1106 }
1107
1108 #[must_use]
1111 pub const fn cold_row_count(&self) -> u64 {
1112 self.cold_row_count
1113 }
1114
1115 pub fn set_cold_row_count(&mut self, n: u64) {
1118 self.cold_row_count = n;
1119 self.cold_row_count_stale = false;
1120 }
1121
1122 pub fn mark_cold_row_count_stale(&mut self) {
1127 self.cold_row_count_stale = true;
1128 }
1129
1130 #[must_use]
1134 pub const fn cold_row_count_stale(&self) -> bool {
1135 self.cold_row_count_stale
1136 }
1137
1138 #[must_use]
1149 pub fn count_cold_locators(&self) -> u64 {
1150 let mut best: u64 = 0;
1151 for idx in &self.indices {
1152 if let IndexKind::BTree(map) = &idx.kind {
1153 let n: u64 = map
1154 .iter()
1155 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1156 .sum();
1157 if n > best {
1158 best = n;
1159 }
1160 }
1161 }
1162 best
1163 }
1164
1165 pub const fn schema(&self) -> &TableSchema {
1166 &self.schema
1167 }
1168
1169 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1173 &mut self.schema
1174 }
1175
1176 pub const fn rows(&self) -> &PersistentVec<Row> {
1180 &self.rows
1181 }
1182
1183 pub const fn row_count(&self) -> usize {
1184 self.rows.len()
1185 }
1186
1187 pub fn indices_mut(&mut self) -> &mut [Index] {
1192 &mut self.indices
1193 }
1194
1195 pub fn indices(&self) -> &[Index] {
1196 &self.indices
1197 }
1198
1199 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1205 let ty = self.schema.columns.get(col_pos)?.ty;
1206 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1207 return None;
1208 }
1209 let mut max: Option<i64> = None;
1210 for row in &self.rows {
1211 match row.values.get(col_pos) {
1212 Some(Value::SmallInt(n)) => {
1213 let v = i64::from(*n);
1214 max = Some(max.map_or(v, |m| m.max(v)));
1215 }
1216 Some(Value::Int(n)) => {
1217 let v = i64::from(*n);
1218 max = Some(max.map_or(v, |m| m.max(v)));
1219 }
1220 Some(Value::BigInt(n)) => {
1221 max = Some(max.map_or(*n, |m| m.max(*n)));
1222 }
1223 _ => {}
1224 }
1225 }
1226 Some(max.map_or(1, |m| m + 1))
1227 }
1228
1229 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1233 self.indices
1240 .iter()
1241 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1242 .or_else(|| {
1243 self.indices.iter().find(|i| {
1244 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1245 })
1246 })
1247 }
1248
1249 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1253 if row.len() != self.schema.columns.len() {
1254 return Err(StorageError::ArityMismatch {
1255 expected: self.schema.columns.len(),
1256 actual: row.len(),
1257 });
1258 }
1259 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1260 if val.is_null() {
1261 if !col.nullable {
1262 return Err(StorageError::NullInNotNull {
1263 column: col.name.clone(),
1264 });
1265 }
1266 continue;
1267 }
1268 let actual = val.data_type().expect("non-null");
1269 let compatible = actual == col.ty
1283 || matches!(
1284 (actual, col.ty),
1285 (
1286 DataType::Text,
1287 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1288 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1289 | (DataType::Json, DataType::Jsonb)
1290 | (DataType::Jsonb, DataType::Json)
1291 | (DataType::Timestamp, DataType::Timestamptz)
1292 | (DataType::Timestamptz, DataType::Timestamp)
1293 )
1294 || matches!(
1295 (actual, col.ty),
1296 (
1297 DataType::Numeric { scale: a, .. },
1298 DataType::Numeric { scale: b, .. },
1299 ) if a == b
1300 );
1301 if !compatible {
1302 return Err(StorageError::TypeMismatch {
1303 column: col.name.clone(),
1304 expected: col.ty,
1305 actual,
1306 position: i,
1307 });
1308 }
1309 }
1310 let new_row_idx = self.rows.len();
1311 for idx in &mut self.indices {
1315 match &mut idx.kind {
1316 IndexKind::BTree(map) => {
1317 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1318 let mut entries = map.get(&key).cloned().unwrap_or_default();
1324 entries.push(RowLocator::Hot(new_row_idx));
1325 map.insert_mut(key, entries);
1326 }
1327 }
1328 IndexKind::Gin(map) => {
1329 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1333 for lex in lexemes {
1334 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1335 entries.push(RowLocator::Hot(new_row_idx));
1336 map.insert_mut(lex.word.clone(), entries);
1337 }
1338 }
1339 }
1340 IndexKind::GinTrgm(map) => {
1341 if let Value::Text(s) = &row.values[idx.column_position] {
1345 for tri in trgm::extract_trigrams(s) {
1346 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1347 entries.push(RowLocator::Hot(new_row_idx));
1348 map.insert_mut(tri, entries);
1349 }
1350 }
1351 }
1352 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1356 }
1357 }
1358 self.hot_bytes = self
1361 .hot_bytes
1362 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1363 self.rows.push_mut(row);
1368 let new_row_idx = self.rows.len() - 1;
1371 let nsw_targets: Vec<usize> = self
1372 .indices
1373 .iter()
1374 .enumerate()
1375 .filter_map(|(i, idx)| {
1376 if matches!(idx.kind, IndexKind::Nsw(_)) {
1377 Some(i)
1378 } else {
1379 None
1380 }
1381 })
1382 .collect();
1383 for idx_pos in nsw_targets {
1384 nsw_insert_at(self, idx_pos, new_row_idx);
1385 }
1386 Ok(())
1387 }
1388
1389 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1393 if self.indices.iter().any(|i| i.name == name) {
1394 return Err(StorageError::DuplicateIndex { name });
1395 }
1396 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1397 StorageError::ColumnNotFound {
1398 column: column_name.into(),
1399 }
1400 })?;
1401 let mut idx = Index::new_btree(name, column_position);
1402 if let IndexKind::BTree(map) = &mut idx.kind {
1403 for (i, row) in self.rows.iter().enumerate() {
1404 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1405 let mut entries = map.get(&key).cloned().unwrap_or_default();
1406 entries.push(RowLocator::Hot(i));
1407 map.insert_mut(key, entries);
1408 }
1409 }
1410 }
1411 self.indices.push(idx);
1412 Ok(())
1413 }
1414
1415 pub fn add_nsw_index(
1420 &mut self,
1421 name: String,
1422 column_name: &str,
1423 m: usize,
1424 ) -> Result<(), StorageError> {
1425 self.add_nsw_index_inner(name, column_name, m, None)
1426 }
1427
1428 pub fn rebuild_nsw_index(
1440 &mut self,
1441 name: &str,
1442 new_encoding: Option<VecEncoding>,
1443 ) -> Result<(), StorageError> {
1444 let idx_pos = self
1445 .indices
1446 .iter()
1447 .position(|i| i.name == name)
1448 .ok_or_else(|| StorageError::IndexNotFound {
1449 name: String::from(name),
1450 })?;
1451 let col_pos = self.indices[idx_pos].column_position;
1452 let m = match &self.indices[idx_pos].kind {
1453 IndexKind::Nsw(g) => g.m,
1454 IndexKind::BTree(_)
1455 | IndexKind::Brin { .. }
1456 | IndexKind::Gin(_)
1457 | IndexKind::GinTrgm(_) => {
1458 return Err(StorageError::Unsupported(format!(
1459 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1460 )));
1461 }
1462 };
1463 let col_name = self.schema.columns[col_pos].name.clone();
1464 if let Some(target) = new_encoding {
1467 let current = match self.schema.columns[col_pos].ty {
1468 DataType::Vector { encoding, .. } => encoding,
1469 ref other => {
1470 return Err(StorageError::Unsupported(format!(
1471 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1472 )));
1473 }
1474 };
1475 if target != current {
1476 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1477 unreachable!("checked above")
1478 };
1479 let n = self.rows.len();
1480 for i in 0..n {
1481 let row = self
1482 .rows
1483 .get_mut(i)
1484 .expect("row index in bounds (we iterated up to len())");
1485 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1486 let recoded = recode_vector_cell(cell, target)?;
1487 row.values[col_pos] = recoded;
1488 }
1489 self.schema.columns[col_pos].ty = DataType::Vector {
1490 dim,
1491 encoding: target,
1492 };
1493 }
1494 }
1495 self.indices.remove(idx_pos);
1497 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1498 Ok(())
1499 }
1500
1501 pub fn restore_nsw_index(
1506 &mut self,
1507 name: String,
1508 column_name: &str,
1509 graph: NswGraph,
1510 ) -> Result<(), StorageError> {
1511 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1512 }
1513
1514 pub fn restore_btree_index(
1521 &mut self,
1522 name: String,
1523 column_name: &str,
1524 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1525 ) -> Result<(), StorageError> {
1526 if self.indices.iter().any(|i| i.name == name) {
1527 return Err(StorageError::DuplicateIndex { name });
1528 }
1529 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1530 StorageError::ColumnNotFound {
1531 column: column_name.into(),
1532 }
1533 })?;
1534 self.indices.push(Index {
1535 name,
1536 column_position,
1537 kind: IndexKind::BTree(map),
1538 included_columns: Vec::new(),
1539 partial_predicate: None,
1540 expression: None,
1541 is_unique: false,
1542 extra_column_positions: Vec::new(),
1543 });
1544 Ok(())
1545 }
1546
1547 pub fn restore_brin_index(
1552 &mut self,
1553 name: String,
1554 column_name: &str,
1555 column_type: DataType,
1556 ) -> Result<(), StorageError> {
1557 if self.indices.iter().any(|i| i.name == name) {
1558 return Err(StorageError::DuplicateIndex { name });
1559 }
1560 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1561 StorageError::ColumnNotFound {
1562 column: column_name.into(),
1563 }
1564 })?;
1565 self.indices
1566 .push(Index::new_brin(name, column_position, column_type));
1567 Ok(())
1568 }
1569
1570 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1574 if self.indices.iter().any(|i| i.name == name) {
1575 return Err(StorageError::DuplicateIndex { name });
1576 }
1577 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1578 StorageError::ColumnNotFound {
1579 column: column_name.into(),
1580 }
1581 })?;
1582 let column_type = self.schema.columns[column_position].ty;
1583 self.indices
1584 .push(Index::new_brin(name, column_position, column_type));
1585 Ok(())
1586 }
1587
1588 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1593 if self.indices.iter().any(|i| i.name == name) {
1594 return Err(StorageError::DuplicateIndex { name });
1595 }
1596 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1597 StorageError::ColumnNotFound {
1598 column: column_name.into(),
1599 }
1600 })?;
1601 if self.schema.columns[column_position].ty != DataType::TsVector {
1602 return Err(StorageError::Corrupt(format!(
1603 "GIN index {name:?} requires a tsvector column; \
1604 {column_name:?} is {:?}",
1605 self.schema.columns[column_position].ty
1606 )));
1607 }
1608 let mut idx = Index::new_gin(name, column_position);
1609 if let IndexKind::Gin(map) = &mut idx.kind {
1610 for (i, row) in self.rows.iter().enumerate() {
1611 if let Value::TsVector(lexemes) = &row.values[column_position] {
1612 for lex in lexemes {
1613 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1614 entries.push(RowLocator::Hot(i));
1615 map.insert_mut(lex.word.clone(), entries);
1616 }
1617 }
1618 }
1619 }
1620 self.indices.push(idx);
1621 Ok(())
1622 }
1623
1624 pub fn restore_gin_index(
1629 &mut self,
1630 name: String,
1631 column_name: &str,
1632 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1633 ) -> Result<(), StorageError> {
1634 if self.indices.iter().any(|i| i.name == name) {
1635 return Err(StorageError::DuplicateIndex { name });
1636 }
1637 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1638 StorageError::ColumnNotFound {
1639 column: column_name.into(),
1640 }
1641 })?;
1642 let mut idx = Index::new_gin(name, column_position);
1643 idx.kind = IndexKind::Gin(map);
1644 self.indices.push(idx);
1645 Ok(())
1646 }
1647
1648 pub fn add_gin_trgm_index(
1653 &mut self,
1654 name: String,
1655 column_name: &str,
1656 ) -> Result<(), StorageError> {
1657 if self.indices.iter().any(|i| i.name == name) {
1658 return Err(StorageError::DuplicateIndex { name });
1659 }
1660 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1661 StorageError::ColumnNotFound {
1662 column: column_name.into(),
1663 }
1664 })?;
1665 if !matches!(
1666 self.schema.columns[column_position].ty,
1667 DataType::Text | DataType::Varchar(_)
1668 ) {
1669 return Err(StorageError::Corrupt(format!(
1670 "trigram-GIN index {name:?} requires a TEXT/VARCHAR column; \
1671 {column_name:?} is {:?}",
1672 self.schema.columns[column_position].ty
1673 )));
1674 }
1675 let mut idx = Index::new_gin_trgm(name, column_position);
1676 if let IndexKind::GinTrgm(map) = &mut idx.kind {
1677 for (i, row) in self.rows.iter().enumerate() {
1678 if let Value::Text(s) = &row.values[column_position] {
1679 for tri in trgm::extract_trigrams(s) {
1680 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1681 entries.push(RowLocator::Hot(i));
1682 map.insert_mut(tri, entries);
1683 }
1684 }
1685 }
1686 }
1687 self.indices.push(idx);
1688 Ok(())
1689 }
1690
1691 pub fn restore_gin_trgm_index(
1694 &mut self,
1695 name: String,
1696 column_name: &str,
1697 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1698 ) -> Result<(), StorageError> {
1699 if self.indices.iter().any(|i| i.name == name) {
1700 return Err(StorageError::DuplicateIndex { name });
1701 }
1702 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1703 StorageError::ColumnNotFound {
1704 column: column_name.into(),
1705 }
1706 })?;
1707 let mut idx = Index::new_gin_trgm(name, column_position);
1708 idx.kind = IndexKind::GinTrgm(map);
1709 self.indices.push(idx);
1710 Ok(())
1711 }
1712
1713 pub fn register_cold_locators<I>(
1730 &mut self,
1731 index_name: &str,
1732 locators: I,
1733 ) -> Result<usize, StorageError>
1734 where
1735 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1736 {
1737 let idx = self
1738 .indices
1739 .iter_mut()
1740 .find(|i| i.name == index_name)
1741 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1742 let map = match &mut idx.kind {
1743 IndexKind::BTree(map) => map,
1744 IndexKind::Nsw(_)
1745 | IndexKind::Brin { .. }
1746 | IndexKind::Gin(_)
1747 | IndexKind::GinTrgm(_) => {
1748 return Err(StorageError::Corrupt(format!(
1749 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1750 )));
1751 }
1752 };
1753 let mut count = 0usize;
1754 for (key, locator) in locators {
1755 let mut entries = map.get(&key).cloned().unwrap_or_default();
1756 entries.push(locator);
1757 map.insert_mut(key, entries);
1758 count += 1;
1759 }
1760 Ok(count)
1761 }
1762
1763 pub fn register_gin_cold_locators<I>(
1770 &mut self,
1771 index_name: &str,
1772 locators: I,
1773 ) -> Result<usize, StorageError>
1774 where
1775 I: IntoIterator<Item = (String, RowLocator)>,
1776 {
1777 let idx = self
1778 .indices
1779 .iter_mut()
1780 .find(|i| i.name == index_name)
1781 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1782 let map = match &mut idx.kind {
1783 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => map,
1784 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1785 return Err(StorageError::Corrupt(format!(
1786 "register_gin_cold_locators: index {index_name:?} is not GIN"
1787 )));
1788 }
1789 };
1790 let mut count = 0usize;
1791 for (word, locator) in locators {
1792 let mut entries = map.get(&word).cloned().unwrap_or_default();
1793 entries.push(locator);
1794 map.insert_mut(word, entries);
1795 count += 1;
1796 }
1797 Ok(count)
1798 }
1799
1800 pub fn remove_cold_locators_for_key(
1810 &mut self,
1811 index_name: &str,
1812 key: &IndexKey,
1813 ) -> Result<usize, StorageError> {
1814 let idx = self
1815 .indices
1816 .iter_mut()
1817 .find(|i| i.name == index_name)
1818 .ok_or_else(|| {
1819 StorageError::Corrupt(format!(
1820 "remove_cold_locators_for_key: index {index_name:?} not found"
1821 ))
1822 })?;
1823 let map = match &mut idx.kind {
1824 IndexKind::BTree(map) => map,
1825 IndexKind::Nsw(_)
1826 | IndexKind::Brin { .. }
1827 | IndexKind::Gin(_)
1828 | IndexKind::GinTrgm(_) => {
1829 return Err(StorageError::Corrupt(format!(
1830 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1831 cold locators apply only to BTree indices"
1832 )));
1833 }
1834 };
1835 let Some(entries) = map.get(key) else {
1836 return Ok(0);
1837 };
1838 let mut kept: Vec<RowLocator> =
1839 entries.iter().copied().filter(RowLocator::is_hot).collect();
1840 let removed = entries.len() - kept.len();
1841 if removed == 0 {
1842 return Ok(0);
1843 }
1844 kept.shrink_to_fit();
1845 map.insert_mut(key.clone(), kept);
1853 Ok(removed)
1854 }
1855
1856 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1863 self.schema.columns.push(col);
1864 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1865 for row in self.rows.iter() {
1866 let mut values = row.values.clone();
1867 values.push(fill_value.clone());
1868 new_rows.push_mut(Row::new(values));
1869 }
1870 self.rows = new_rows;
1871 }
1872
1873 pub fn set_partial_predicate(&mut self, idx: usize, pred: Option<String>) {
1880 debug_assert!(idx < self.indices.len());
1881 self.indices[idx].partial_predicate = pred;
1882 }
1883
1884 pub fn rename_column(&mut self, col_pos: usize, new_name: &str) {
1895 debug_assert!(col_pos < self.schema.columns.len());
1896 self.schema.columns[col_pos].name = new_name.to_string();
1897 }
1898
1899 pub fn drop_column(&mut self, col_pos: usize) {
1909 debug_assert!(col_pos < self.schema.columns.len());
1910 self.schema.columns.remove(col_pos);
1912 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1914 for row in self.rows.iter() {
1915 let mut values = row.values.clone();
1916 if col_pos < values.len() {
1917 values.remove(col_pos);
1918 }
1919 new_rows.push_mut(Row::new(values));
1920 }
1921 self.rows = new_rows;
1922 self.indices.retain(|idx| idx.column_position != col_pos);
1924 for idx in &mut self.indices {
1925 if idx.column_position > col_pos {
1926 idx.column_position -= 1;
1927 }
1928 for inc in &mut idx.included_columns {
1930 if *inc > col_pos {
1931 *inc -= 1;
1932 }
1933 }
1934 }
1935 let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
1940 for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
1941 uc.columns.retain(|&c| c != col_pos);
1942 if uc.columns.is_empty() {
1943 continue;
1944 }
1945 for c in &mut uc.columns {
1946 if *c > col_pos {
1947 *c -= 1;
1948 }
1949 }
1950 surviving_ucs.push(uc);
1951 }
1952 self.schema.uniqueness_constraints = surviving_ucs;
1953 for fk in &mut self.schema.foreign_keys {
1956 for c in &mut fk.local_columns {
1957 if *c > col_pos {
1958 *c -= 1;
1959 }
1960 }
1961 }
1962 self.rebuild_indices();
1969 }
1970
1971 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1977 if positions.is_empty() {
1978 return 0;
1979 }
1980 let mut to_remove = alloc::vec![false; self.rows.len()];
1984 let mut removed = 0;
1985 for &p in positions {
1986 if p < to_remove.len() && !to_remove[p] {
1987 to_remove[p] = true;
1988 removed += 1;
1989 }
1990 }
1991 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1992 let mut removed_bytes: u64 = 0;
1993 for (i, row) in self.rows.iter().enumerate() {
1994 if to_remove[i] {
1995 removed_bytes =
1996 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1997 } else {
1998 new_rows.push_mut(row.clone());
1999 }
2000 }
2001 self.rows = new_rows;
2002 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
2003 self.rebuild_indices();
2004 removed
2005 }
2006
2007 pub fn update_row(
2013 &mut self,
2014 position: usize,
2015 new_values: Vec<Value>,
2016 ) -> Result<(), StorageError> {
2017 if position >= self.rows.len() {
2018 return Err(StorageError::Corrupt(alloc::format!(
2019 "update_row: position {position} out of bounds (rows={})",
2020 self.rows.len()
2021 )));
2022 }
2023 if new_values.len() != self.schema.columns.len() {
2024 return Err(StorageError::ArityMismatch {
2025 expected: self.schema.columns.len(),
2026 actual: new_values.len(),
2027 });
2028 }
2029 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
2033 if val.is_null() {
2034 if !col.nullable {
2035 return Err(StorageError::NullInNotNull {
2036 column: col.name.clone(),
2037 });
2038 }
2039 continue;
2040 }
2041 let actual = val.data_type().expect("non-null");
2042 let compatible = actual == col.ty
2043 || matches!(
2044 (actual, col.ty),
2045 (
2046 DataType::Text,
2047 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
2048 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
2049 | (DataType::Json, DataType::Jsonb)
2050 | (DataType::Jsonb, DataType::Json)
2051 | (DataType::Timestamp, DataType::Timestamptz)
2052 | (DataType::Timestamptz, DataType::Timestamp)
2053 )
2054 || matches!(
2055 (actual, col.ty),
2056 (
2057 DataType::Numeric { scale: a, .. },
2058 DataType::Numeric { scale: b, .. },
2059 ) if a == b
2060 );
2061 if !compatible {
2062 return Err(StorageError::TypeMismatch {
2063 column: col.name.clone(),
2064 expected: col.ty,
2065 actual,
2066 position: i,
2067 });
2068 }
2069 }
2070 let old_row = self
2071 .rows
2072 .get(position)
2073 .expect("position bounds-checked above");
2074 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
2075 let new_row = Row::new(new_values);
2076 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
2077 self.rows = self
2078 .rows
2079 .set(position, new_row)
2080 .expect("position bounds-checked above");
2081 self.hot_bytes = self
2082 .hot_bytes
2083 .saturating_sub(old_bytes)
2084 .saturating_add(new_bytes);
2085 self.rebuild_indices();
2086 Ok(())
2087 }
2088
2089 fn rebuild_indices(&mut self) {
2096 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
2105 .indices
2106 .iter()
2107 .filter_map(|idx| match &idx.kind {
2108 IndexKind::BTree(map) => {
2109 let cold: Vec<(IndexKey, RowLocator)> = map
2110 .iter()
2111 .flat_map(|(k, locs)| {
2112 locs.iter()
2113 .filter(|l| l.is_cold())
2114 .copied()
2115 .map(move |l| (k.clone(), l))
2116 })
2117 .collect();
2118 if cold.is_empty() {
2119 None
2120 } else {
2121 Some((idx.name.clone(), cold))
2122 }
2123 }
2124 IndexKind::Nsw(_)
2127 | IndexKind::Brin { .. }
2128 | IndexKind::Gin(_)
2129 | IndexKind::GinTrgm(_) => None,
2130 })
2131 .collect();
2132
2133 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
2141 .indices
2142 .iter()
2143 .filter_map(|idx| match &idx.kind {
2144 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => {
2145 let cold: Vec<(String, RowLocator)> = map
2146 .iter()
2147 .flat_map(|(w, locs)| {
2148 locs.iter()
2149 .filter(|l| l.is_cold())
2150 .copied()
2151 .map(move |l| (w.clone(), l))
2152 })
2153 .collect();
2154 if cold.is_empty() {
2155 None
2156 } else {
2157 Some((idx.name.clone(), cold))
2158 }
2159 }
2160 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
2161 })
2162 .collect();
2163
2164 #[derive(Clone)]
2169 enum RebuildKind {
2170 BTree,
2171 Nsw(usize),
2172 Brin(DataType),
2173 Gin,
2174 GinTrgm,
2175 }
2176 let descriptors: Vec<(String, usize, RebuildKind)> = self
2177 .indices
2178 .iter()
2179 .map(|idx| {
2180 let kind = match &idx.kind {
2181 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
2182 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
2183 IndexKind::BTree(_) => RebuildKind::BTree,
2184 IndexKind::Gin(_) => RebuildKind::Gin,
2185 IndexKind::GinTrgm(_) => RebuildKind::GinTrgm,
2186 };
2187 (idx.name.clone(), idx.column_position, kind)
2188 })
2189 .collect();
2190 self.indices.clear();
2191 for (name, column_position, rebuild_kind) in descriptors {
2192 match rebuild_kind {
2193 RebuildKind::Nsw(m) => {
2194 let idx = Index::new_nsw(name, column_position, m);
2195 self.indices.push(idx);
2196 let idx_pos = self.indices.len() - 1;
2197 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2198 for row_idx in row_indices {
2199 nsw_insert_at(self, idx_pos, row_idx);
2200 }
2201 }
2202 RebuildKind::Brin(column_type) => {
2203 self.indices
2206 .push(Index::new_brin(name, column_position, column_type));
2207 }
2208 RebuildKind::BTree => {
2209 let mut idx = Index::new_btree(name, column_position);
2210 if let IndexKind::BTree(map) = &mut idx.kind {
2211 for (i, row) in self.rows.iter().enumerate() {
2212 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
2213 let mut entries = map.get(&key).cloned().unwrap_or_default();
2214 entries.push(RowLocator::Hot(i));
2215 map.insert_mut(key, entries);
2216 }
2217 }
2218 }
2219 self.indices.push(idx);
2220 }
2221 RebuildKind::Gin => {
2222 let mut idx = Index::new_gin(name, column_position);
2223 if let IndexKind::Gin(map) = &mut idx.kind {
2224 for (i, row) in self.rows.iter().enumerate() {
2225 if let Value::TsVector(lexemes) = &row.values[column_position] {
2226 for lex in lexemes {
2227 let mut entries =
2228 map.get(&lex.word).cloned().unwrap_or_default();
2229 entries.push(RowLocator::Hot(i));
2230 map.insert_mut(lex.word.clone(), entries);
2231 }
2232 }
2233 }
2234 }
2235 self.indices.push(idx);
2236 }
2237 RebuildKind::GinTrgm => {
2238 let mut idx = Index::new_gin_trgm(name, column_position);
2239 if let IndexKind::GinTrgm(map) = &mut idx.kind {
2240 for (i, row) in self.rows.iter().enumerate() {
2241 if let Value::Text(s) = &row.values[column_position] {
2242 for tri in trgm::extract_trigrams(s) {
2243 let mut entries = map.get(&tri).cloned().unwrap_or_default();
2244 entries.push(RowLocator::Hot(i));
2245 map.insert_mut(tri, entries);
2246 }
2247 }
2248 }
2249 }
2250 self.indices.push(idx);
2251 }
2252 }
2253 }
2254
2255 for (idx_name, locators) in preserved_cold {
2260 let _ = self.register_cold_locators(&idx_name, locators);
2264 }
2265 for (idx_name, locators) in preserved_gin_cold {
2267 let _ = self.register_gin_cold_locators(&idx_name, locators);
2268 }
2269 }
2270
2271 fn add_nsw_index_inner(
2272 &mut self,
2273 name: String,
2274 column_name: &str,
2275 m: usize,
2276 restore: Option<NswGraph>,
2277 ) -> Result<(), StorageError> {
2278 if self.indices.iter().any(|i| i.name == name) {
2279 return Err(StorageError::DuplicateIndex { name });
2280 }
2281 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2282 StorageError::ColumnNotFound {
2283 column: column_name.into(),
2284 }
2285 })?;
2286 if !matches!(
2287 self.schema.columns[column_position].ty,
2288 DataType::Vector { .. }
2289 ) {
2290 return Err(StorageError::TypeMismatch {
2291 column: column_name.into(),
2292 expected: DataType::Vector {
2293 dim: 0,
2294 encoding: VecEncoding::F32,
2295 },
2296 actual: self.schema.columns[column_position].ty,
2297 position: column_position,
2298 });
2299 }
2300 if let Some(graph) = restore {
2301 self.indices.push(Index {
2302 name,
2303 column_position,
2304 kind: IndexKind::Nsw(graph),
2305 included_columns: Vec::new(),
2306 partial_predicate: None,
2307 expression: None,
2308 is_unique: false,
2309 extra_column_positions: Vec::new(),
2310 });
2311 return Ok(());
2312 }
2313 let idx = Index::new_nsw(name, column_position, m);
2314 self.indices.push(idx);
2315 let idx_pos = self.indices.len() - 1;
2316 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2319 for row_idx in row_indices {
2320 nsw_insert_at(self, idx_pos, row_idx);
2321 }
2322 Ok(())
2323 }
2324}
2325
2326fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2333 if matches!(cell, Value::Null) {
2334 return Ok(cell);
2335 }
2336 let as_f32: Vec<f32> = match &cell {
2338 Value::Vector(v) => v.clone(),
2339 Value::Sq8Vector(q) => quantize::dequantize(q),
2340 Value::HalfVector(h) => h.to_f32_vec(),
2341 other => {
2342 return Err(StorageError::Unsupported(format!(
2343 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2344 other.data_type()
2345 )));
2346 }
2347 };
2348 Ok(match target {
2353 VecEncoding::F32 => Value::Vector(as_f32),
2354 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2355 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2356 })
2357}
2358
2359fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2366 let col_pos = table.indices[idx_pos].column_position;
2367 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2368 Value::Vector(v) => Some(v.len()),
2369 Value::Sq8Vector(q) => Some(q.bytes.len()),
2370 Value::HalfVector(h) => Some(h.dim()),
2371 _ => None,
2372 };
2373 let Some(dim) = cell_dim else {
2374 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2377 return;
2378 };
2379 if dim == 0 {
2380 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2381 return;
2382 }
2383 let level = nsw_assign_level(new_row_idx);
2384 ensure_node_slot(table, idx_pos, new_row_idx, level);
2385 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2386 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2387 IndexKind::BTree(_)
2388 | IndexKind::Brin { .. }
2389 | IndexKind::Gin(_)
2390 | IndexKind::GinTrgm(_) => {
2391 unreachable!("nsw_insert_at on a non-NSW index")
2392 }
2393 };
2394 if entry.is_none() {
2396 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2397 g.entry = Some(new_row_idx);
2398 g.entry_level = level;
2399 *g.levels
2400 .get_mut(new_row_idx)
2401 .expect("levels slot padded by ensure_node_slot") = level;
2402 }
2403 return;
2404 }
2405 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2407 *g.levels
2408 .get_mut(new_row_idx)
2409 .expect("levels slot padded by ensure_node_slot") = level;
2410 }
2411 let query = match &table.rows[new_row_idx].values[col_pos] {
2412 Value::Vector(v) => v.clone(),
2413 Value::Sq8Vector(q) => quantize::dequantize(q),
2419 Value::HalfVector(h) => h.to_f32_vec(),
2422 _ => return,
2423 };
2424 let mut current = entry.expect("entry was Some above");
2427 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2428 if entry_level > level {
2429 for layer in (level + 1..=entry_level).rev() {
2430 (current, current_d) =
2431 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2432 }
2433 }
2434 let top = level.min(entry_level);
2438 let ef = (m * 2).max(8);
2439 for layer in (0..=top).rev() {
2440 let cap = if layer == 0 { m * 2 } else { m };
2441 let mut candidates = layer_beam_search(
2442 table,
2443 idx_pos,
2444 layer,
2445 current,
2446 current_d,
2447 &query,
2448 ef,
2449 NswMetric::L2,
2450 );
2451 candidates.retain(|&(_, n)| n != new_row_idx);
2452 if let Some(&(d, n)) = candidates.first() {
2455 current = n;
2456 current_d = d;
2457 }
2458 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2459 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2460 }
2461 if level > entry_level
2464 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2465 {
2466 g.entry = Some(new_row_idx);
2467 g.entry_level = level;
2468 }
2469}
2470
2471fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2475 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2476 unreachable!("ensure_node_slot on a BTree index");
2477 };
2478 while g.layers.len() <= level as usize {
2479 g.layers.push(PersistentVec::new());
2480 }
2481 while g.levels.len() <= new_row_idx {
2482 g.levels.push_mut(0);
2483 }
2484 for layer_vec in &mut g.layers {
2485 while layer_vec.len() <= new_row_idx {
2486 layer_vec.push_mut(Vec::new());
2487 }
2488 }
2489}
2490
2491fn greedy_layer_walk(
2497 table: &Table,
2498 idx_pos: usize,
2499 layer: u8,
2500 mut current: usize,
2501 mut current_d: f32,
2502 query: &[f32],
2503) -> (usize, f32) {
2504 let g = match &table.indices[idx_pos].kind {
2505 IndexKind::Nsw(g) => g,
2506 IndexKind::BTree(_)
2507 | IndexKind::Brin { .. }
2508 | IndexKind::Gin(_)
2509 | IndexKind::GinTrgm(_) => {
2510 return (current, current_d);
2511 }
2512 };
2513 let col_pos = table.indices[idx_pos].column_position;
2514 loop {
2515 let neighbours: &[u32] = g
2516 .layers
2517 .get(layer as usize)
2518 .and_then(|layer_v| layer_v.get(current))
2519 .map_or(&[][..], Vec::as_slice);
2520 let mut best = current;
2521 let mut best_d = current_d;
2522 for &n in neighbours {
2523 let n = n as usize;
2524 let d = vec_l2_sq(table, col_pos, n, query);
2525 if d < best_d {
2526 best = n;
2527 best_d = d;
2528 }
2529 }
2530 if best == current {
2531 return (current, current_d);
2532 }
2533 current = best;
2534 current_d = best_d;
2535 }
2536}
2537
2538#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2551 table: &Table,
2552 idx_pos: usize,
2553 layer: u8,
2554 entry_node: usize,
2555 entry_d: f32,
2556 query: &[f32],
2557 ef: usize,
2558 metric: NswMetric,
2559) -> Vec<(f32, usize)> {
2560 let g = match &table.indices[idx_pos].kind {
2561 IndexKind::Nsw(g) => g,
2562 IndexKind::BTree(_)
2563 | IndexKind::Brin { .. }
2564 | IndexKind::Gin(_)
2565 | IndexKind::GinTrgm(_) => return Vec::new(),
2566 };
2567 let col_pos = table.indices[idx_pos].column_position;
2568 let d0 = if matches!(metric, NswMetric::L2) {
2569 entry_d
2570 } else {
2571 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2572 };
2573 let row_count = table.rows.len();
2574 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2575 if entry_node < row_count {
2576 visited[entry_node] = true;
2577 }
2578 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2581 alloc::collections::BinaryHeap::with_capacity(ef);
2582 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2583 alloc::collections::BinaryHeap::with_capacity(ef);
2584 candidates.push(NodeClosest {
2585 dist: d0,
2586 node: entry_node,
2587 });
2588 results.push(NodeFurthest {
2589 dist: d0,
2590 node: entry_node,
2591 });
2592 while let Some(cur) = candidates.pop() {
2593 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2594 if cur.dist > worst && results.len() >= ef {
2595 break;
2596 }
2597 let neighbours: &[u32] = g
2598 .layers
2599 .get(layer as usize)
2600 .and_then(|layer_v| layer_v.get(cur.node))
2601 .map_or(&[][..], Vec::as_slice);
2602 for &n in neighbours {
2603 let n = n as usize;
2604 if n >= row_count || visited[n] {
2605 continue;
2606 }
2607 visited[n] = true;
2608 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2612 if !dn.is_finite() {
2613 continue;
2614 }
2615 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2616 if results.len() < ef || dn < worst {
2617 results.push(NodeFurthest { dist: dn, node: n });
2618 if results.len() > ef {
2619 results.pop();
2620 }
2621 candidates.push(NodeClosest { dist: dn, node: n });
2622 }
2623 }
2624 }
2625 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2628 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2629 out
2630}
2631
2632#[derive(Debug, Clone, Copy)]
2636struct NodeClosest {
2637 dist: f32,
2638 node: usize,
2639}
2640impl PartialEq for NodeClosest {
2641 fn eq(&self, other: &Self) -> bool {
2642 self.dist == other.dist && self.node == other.node
2643 }
2644}
2645impl Eq for NodeClosest {}
2646impl PartialOrd for NodeClosest {
2647 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2648 Some(self.cmp(other))
2649 }
2650}
2651impl Ord for NodeClosest {
2652 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2653 other
2655 .dist
2656 .partial_cmp(&self.dist)
2657 .unwrap_or(core::cmp::Ordering::Equal)
2658 }
2659}
2660
2661#[derive(Debug, Clone, Copy)]
2664struct NodeFurthest {
2665 dist: f32,
2666 node: usize,
2667}
2668impl PartialEq for NodeFurthest {
2669 fn eq(&self, other: &Self) -> bool {
2670 self.dist == other.dist && self.node == other.node
2671 }
2672}
2673impl Eq for NodeFurthest {}
2674impl PartialOrd for NodeFurthest {
2675 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2676 Some(self.cmp(other))
2677 }
2678}
2679impl Ord for NodeFurthest {
2680 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2681 self.dist
2682 .partial_cmp(&other.dist)
2683 .unwrap_or(core::cmp::Ordering::Equal)
2684 }
2685}
2686
2687fn select_neighbours_heuristic(
2696 candidates: &[(f32, usize)],
2697 m: usize,
2698 table: &Table,
2699 col_pos: usize,
2700) -> Vec<usize> {
2701 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2702 for &(d_q, e) in candidates {
2703 if chosen.len() >= m {
2704 break;
2705 }
2706 if !matches!(
2711 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2712 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2713 ) {
2714 continue;
2715 }
2716 let mut covered = false;
2717 for &r in &chosen {
2718 if cell_l2_sq(table, col_pos, e, r) < d_q {
2722 covered = true;
2723 break;
2724 }
2725 }
2726 if !covered {
2727 chosen.push(e);
2728 }
2729 }
2730 chosen
2731}
2732
2733fn connect_at_layer(
2737 table: &mut Table,
2738 idx_pos: usize,
2739 layer: u8,
2740 new_row_idx: usize,
2741 peers: &[usize],
2742) {
2743 let col_pos = table.indices[idx_pos].column_position;
2744 let cap = match &table.indices[idx_pos].kind {
2745 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2746 IndexKind::BTree(_)
2747 | IndexKind::Brin { .. }
2748 | IndexKind::Gin(_)
2749 | IndexKind::GinTrgm(_) => return,
2750 };
2751 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2756 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2757 let layer_v = &mut g.layers[layer as usize];
2758 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2759 *slot = peers
2760 .iter()
2761 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2762 .collect();
2763 }
2764 }
2765 for &peer in peers {
2766 if !matches!(
2770 &table.rows[peer].values[col_pos],
2771 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2772 ) {
2773 continue;
2774 }
2775 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2777 let layer_v = &mut g.layers[layer as usize];
2778 if let Some(slot) = layer_v.get_mut(peer)
2779 && !slot.contains(&new_row_u32)
2780 {
2781 slot.push(new_row_u32);
2782 }
2783 }
2784 let needs_trim = match &table.indices[idx_pos].kind {
2788 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2789 IndexKind::BTree(_)
2790 | IndexKind::Brin { .. }
2791 | IndexKind::Gin(_)
2792 | IndexKind::GinTrgm(_) => false,
2793 };
2794 if needs_trim {
2795 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2796 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2797 .iter()
2798 .map(|&n| n as usize)
2799 .collect(),
2800 IndexKind::BTree(_)
2801 | IndexKind::Brin { .. }
2802 | IndexKind::Gin(_)
2803 | IndexKind::GinTrgm(_) => continue,
2804 };
2805 let mut tagged: Vec<(f32, usize)> = current_peers
2810 .iter()
2811 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2812 .collect();
2813 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2814 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2815 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2816 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2817 {
2818 *slot = kept
2819 .into_iter()
2820 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2821 .collect();
2822 }
2823 }
2824 }
2825}
2826
2827fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2834 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2835 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2836 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2837 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2838 }
2839 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2843 halfvec::half_l2_distance_sq_asymmetric(h, query)
2844 }
2845 _ => f32::INFINITY,
2846 }
2847}
2848
2849fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2856 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2857 return f32::INFINITY;
2858 };
2859 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2860 return f32::INFINITY;
2861 };
2862 match (cell_a, cell_b) {
2863 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2864 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2865 quantize::sq8_l2_distance_sq(a, b)
2866 }
2867 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2872 halfvec::half_l2_distance_sq(a, b)
2873 }
2874 _ => f32::INFINITY,
2875 }
2876}
2877
2878fn cell_to_query_metric_distance(
2883 table: &Table,
2884 col_pos: usize,
2885 row: usize,
2886 query: &[f32],
2887 metric: NswMetric,
2888) -> f32 {
2889 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2890 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2891 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2892 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2893 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2894 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2895 },
2896 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2899 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2900 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2901 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2902 },
2903 _ => f32::INFINITY,
2904 }
2905}
2906
2907#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2913pub enum NswMetric {
2914 L2,
2917 InnerProduct,
2920 Cosine,
2923}
2924
2925fn nsw_search(
2931 table: &Table,
2932 idx_pos: usize,
2933 query: &[f32],
2934 k: usize,
2935 ef: usize,
2936 metric: NswMetric,
2937) -> Vec<(f32, usize)> {
2938 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2939 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2940 IndexKind::BTree(_)
2941 | IndexKind::Brin { .. }
2942 | IndexKind::Gin(_)
2943 | IndexKind::GinTrgm(_) => return Vec::new(),
2944 };
2945 let Some(entry) = entry else {
2946 return Vec::new();
2947 };
2948 let col_pos = table.indices[idx_pos].column_position;
2949 let sq8 = matches!(
2956 table.schema.columns.get(col_pos).map(|c| c.ty),
2957 Some(DataType::Vector {
2958 encoding: VecEncoding::Sq8,
2959 ..
2960 })
2961 );
2962 let ef = if sq8 {
2963 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2964 } else {
2965 ef.max(k)
2966 };
2967 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2969 let mut current = entry;
2970 let mut current_d = entry_d;
2971 for layer in (1..=entry_level).rev() {
2972 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2973 }
2974 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2976 if sq8 {
2977 results = sq8_rerank(table, col_pos, &results, query, metric);
2978 }
2979 results.truncate(k);
2980 results
2981}
2982
2983fn sq8_rerank(
2990 table: &Table,
2991 col_pos: usize,
2992 candidates: &[(f32, usize)],
2993 query: &[f32],
2994 metric: NswMetric,
2995) -> Vec<(f32, usize)> {
2996 let mut out: Vec<(f32, usize)> = candidates
2997 .iter()
2998 .filter_map(|&(adc_d, row)| {
2999 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
3000 let Value::Sq8Vector(q) = cell else {
3001 return Some((adc_d, row));
3005 };
3006 let deq = quantize::dequantize(q);
3007 if deq.len() != query.len() {
3008 return None;
3009 }
3010 Some((metric_distance(metric, &deq, query), row))
3011 })
3012 .collect();
3013 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
3014 out
3015}
3016
3017const SQ8_RERANK_OVER_FETCH: usize = 3;
3021
3022fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
3023 match metric {
3024 NswMetric::L2 => l2_distance_sq(a, b),
3025 NswMetric::InnerProduct => -inner_product_f32(a, b),
3026 NswMetric::Cosine => {
3027 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
3028 if na == 0.0 || nb == 0.0 {
3029 return f32::INFINITY;
3030 }
3031 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
3034 1.0 - dot / denom
3035 }
3036 }
3037}
3038
3039#[doc(hidden)]
3048#[inline]
3049pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
3050 #[cfg(target_arch = "aarch64")]
3051 {
3052 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3053 return unsafe { inner_product_neon(a, b) };
3056 }
3057 }
3058 inner_product_scalar(a, b)
3059}
3060
3061fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
3062 let mut dot: f32 = 0.0;
3063 for (x, y) in a.iter().zip(b.iter()) {
3064 dot += x * y;
3065 }
3066 dot
3067}
3068
3069#[cfg(target_arch = "aarch64")]
3070#[target_feature(enable = "neon")]
3071#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
3073 use core::arch::aarch64::{
3074 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
3075 };
3076 unsafe {
3077 let zero: float32x4_t = vdupq_n_f32(0.0);
3080 let mut acc0 = zero;
3081 let mut acc1 = zero;
3082 let n = a.len();
3083 let mut i = 0usize;
3084 while i + 8 <= n {
3085 let av0 = vld1q_f32(a.as_ptr().add(i));
3086 let bv0 = vld1q_f32(b.as_ptr().add(i));
3087 acc0 = vfmaq_f32(acc0, av0, bv0);
3088 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
3089 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
3090 acc1 = vfmaq_f32(acc1, av1, bv1);
3091 i += 8;
3092 }
3093 while i + 4 <= n {
3094 let av = vld1q_f32(a.as_ptr().add(i));
3095 let bv = vld1q_f32(b.as_ptr().add(i));
3096 acc0 = vfmaq_f32(acc0, av, bv);
3097 i += 4;
3098 }
3099 vaddvq_f32(vaddq_f32(acc0, acc1))
3100 }
3101}
3102
3103#[doc(hidden)]
3110#[inline]
3111pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3112 #[cfg(target_arch = "aarch64")]
3113 {
3114 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3115 return unsafe { cosine_dot_norms_neon(a, b) };
3117 }
3118 }
3119 cosine_dot_norms_scalar(a, b)
3120}
3121
3122fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3123 let mut dot: f32 = 0.0;
3124 let mut na: f32 = 0.0;
3125 let mut nb: f32 = 0.0;
3126 for (x, y) in a.iter().zip(b.iter()) {
3127 dot += x * y;
3128 na += x * x;
3129 nb += y * y;
3130 }
3131 (dot, na, nb)
3132}
3133
3134#[cfg(target_arch = "aarch64")]
3135#[target_feature(enable = "neon")]
3136#[allow(clippy::many_single_char_names, clippy::similar_names)]
3137unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3138 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
3139 unsafe {
3140 let zero: float32x4_t = vdupq_n_f32(0.0);
3141 let mut acc_dot = zero;
3142 let mut acc_na = zero;
3143 let mut acc_nb = zero;
3144 let n = a.len();
3145 let mut i = 0usize;
3146 while i + 4 <= n {
3147 let av = vld1q_f32(a.as_ptr().add(i));
3148 let bv = vld1q_f32(b.as_ptr().add(i));
3149 acc_dot = vfmaq_f32(acc_dot, av, bv);
3150 acc_na = vfmaq_f32(acc_na, av, av);
3151 acc_nb = vfmaq_f32(acc_nb, bv, bv);
3152 i += 4;
3153 }
3154 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
3155 }
3156}
3157
3158fn sqrt_newton_f32(x: f32) -> f32 {
3159 if x <= 0.0 {
3160 return 0.0;
3161 }
3162 let mut g = x;
3163 for _ in 0..10 {
3164 g = 0.5 * (g + x / g);
3165 }
3166 g
3167}
3168
3169#[inline]
3177fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
3178 #[cfg(target_arch = "aarch64")]
3179 {
3180 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3181 return unsafe { l2_distance_sq_neon(a, b) };
3185 }
3186 }
3187 l2_distance_sq_scalar(a, b)
3188}
3189
3190fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
3191 let mut sum: f32 = 0.0;
3192 for (x, y) in a.iter().zip(b.iter()) {
3193 let d = *x - *y;
3194 sum += d * d;
3195 }
3196 sum
3197}
3198
3199#[cfg(target_arch = "aarch64")]
3200#[target_feature(enable = "neon")]
3201#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
3203 use core::arch::aarch64::{
3204 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
3205 };
3206 unsafe {
3207 let zero: float32x4_t = vdupq_n_f32(0.0);
3212 let mut acc0 = zero;
3213 let mut acc1 = zero;
3214 let n = a.len();
3215 let mut i = 0usize;
3216 while i + 8 <= n {
3219 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3220 acc0 = vfmaq_f32(acc0, d0, d0);
3221 let d1 = vsubq_f32(
3222 vld1q_f32(a.as_ptr().add(i + 4)),
3223 vld1q_f32(b.as_ptr().add(i + 4)),
3224 );
3225 acc1 = vfmaq_f32(acc1, d1, d1);
3226 i += 8;
3227 }
3228 while i + 4 <= n {
3229 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3230 acc0 = vfmaq_f32(acc0, d, d);
3231 i += 4;
3232 }
3233 vaddvq_f32(vaddq_f32(acc0, acc1))
3234 }
3235}
3236
3237pub fn nsw_query(
3240 table: &Table,
3241 idx_name: &str,
3242 query: &[f32],
3243 k: usize,
3244 metric: NswMetric,
3245) -> Vec<usize> {
3246 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
3247 return Vec::new();
3248 };
3249 let ef = (k * 2).max(NSW_DEFAULT_M);
3250 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
3251 hits.truncate(k);
3252 hits.into_iter().map(|(_, idx)| idx).collect()
3253}
3254
3255pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
3259 table
3260 .indices
3261 .iter()
3262 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
3263}
3264
3265#[derive(Debug, Clone, Default)]
3277pub struct Catalog {
3278 tables: Vec<Table>,
3279 by_name: BTreeMap<String, usize>,
3282 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3304 functions: BTreeMap<String, FunctionDef>,
3311 triggers: Vec<TriggerDef>,
3316}
3317
3318#[derive(Debug, Clone, PartialEq, Eq)]
3324pub struct FunctionDef {
3325 pub name: String,
3326 pub args_repr: String,
3330 pub returns: String,
3335 pub language: String,
3337 pub body: String,
3342}
3343
3344#[derive(Debug, Clone, PartialEq, Eq)]
3348pub struct TriggerDef {
3349 pub name: String,
3350 pub table: String,
3352 pub timing: String,
3356 pub events: Vec<String>,
3359 pub for_each: String,
3363 pub function: String,
3365 pub update_columns: Vec<String>,
3372 pub enabled: bool,
3381}
3382
3383impl Catalog {
3384 pub const fn new() -> Self {
3385 Self {
3386 tables: Vec::new(),
3387 by_name: BTreeMap::new(),
3388 cold_segments: Vec::new(),
3389 functions: BTreeMap::new(),
3390 triggers: Vec::new(),
3391 }
3392 }
3393
3394 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3398 &self.functions
3399 }
3400
3401 pub fn create_function(
3405 &mut self,
3406 def: FunctionDef,
3407 or_replace: bool,
3408 ) -> Result<(), StorageError> {
3409 if !or_replace && self.functions.contains_key(&def.name) {
3410 return Err(StorageError::Corrupt(format!(
3411 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3412 def.name
3413 )));
3414 }
3415 self.functions.insert(def.name.clone(), def);
3416 Ok(())
3417 }
3418
3419 pub fn drop_function(&mut self, name: &str) -> bool {
3423 self.functions.remove(name).is_some()
3424 }
3425
3426 pub fn triggers(&self) -> &[TriggerDef] {
3430 &self.triggers
3431 }
3432
3433 pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
3438 &mut self.triggers
3439 }
3440
3441 pub fn create_trigger(
3447 &mut self,
3448 def: TriggerDef,
3449 or_replace: bool,
3450 ) -> Result<(), StorageError> {
3451 if !self.by_name.contains_key(&def.table) {
3452 return Err(StorageError::TableNotFound {
3453 name: def.table.clone(),
3454 });
3455 }
3456 if !self.functions.contains_key(&def.function) {
3457 return Err(StorageError::Corrupt(format!(
3458 "trigger {:?} references unknown function {:?}",
3459 def.name, def.function
3460 )));
3461 }
3462 let dup = self
3463 .triggers
3464 .iter()
3465 .position(|t| t.name == def.name && t.table == def.table);
3466 match (dup, or_replace) {
3467 (Some(_), false) => Err(StorageError::Corrupt(format!(
3468 "trigger {:?} already exists on table {:?}",
3469 def.name, def.table
3470 ))),
3471 (Some(i), true) => {
3472 self.triggers[i] = def;
3473 Ok(())
3474 }
3475 (None, _) => {
3476 self.triggers.push(def);
3477 Ok(())
3478 }
3479 }
3480 }
3481
3482 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3485 let before = self.triggers.len();
3486 self.triggers
3487 .retain(|t| !(t.name == name && t.table == table));
3488 before != self.triggers.len()
3489 }
3490
3491 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3492 if self.by_name.contains_key(&schema.name) {
3493 return Err(StorageError::DuplicateTable {
3494 name: schema.name.clone(),
3495 });
3496 }
3497 let idx = self.tables.len();
3498 let name = schema.name.clone();
3499 self.tables.push(Table::new(schema));
3500 self.by_name.insert(name, idx);
3501 Ok(())
3502 }
3503
3504 pub fn get(&self, name: &str) -> Option<&Table> {
3505 let idx = *self.by_name.get(name)?;
3506 self.tables.get(idx)
3507 }
3508
3509 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3510 let idx = *self.by_name.get(name)?;
3511 self.tables.get_mut(idx)
3512 }
3513
3514 pub fn table_count(&self) -> usize {
3515 self.tables.len()
3516 }
3517
3518 pub fn drop_table(&mut self, name: &str) -> bool {
3524 let Some(idx) = self.by_name.remove(name) else {
3525 return false;
3526 };
3527 self.tables.swap_remove(idx);
3530 if idx < self.tables.len() {
3532 let moved_name = self.tables[idx].schema.name.clone();
3533 self.by_name.insert(moved_name, idx);
3534 }
3535 true
3536 }
3537
3538 pub fn rename_table(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
3551 if old == new {
3552 return Ok(());
3553 }
3554 if self.by_name.contains_key(new) {
3555 return Err(StorageError::Corrupt(format!(
3556 "rename_table: target name {new:?} already exists"
3557 )));
3558 }
3559 let idx = self
3560 .by_name
3561 .remove(old)
3562 .ok_or_else(|| StorageError::TableNotFound { name: old.into() })?;
3563 self.tables[idx].schema.name = new.to_string();
3564 self.by_name.insert(new.to_string(), idx);
3565 for t in &mut self.tables {
3566 for fk in &mut t.schema.foreign_keys {
3567 if fk.parent_table == old {
3568 fk.parent_table = new.to_string();
3569 }
3570 }
3571 }
3572 for trig in &mut self.triggers {
3573 if trig.table == old {
3574 trig.table = new.to_string();
3575 }
3576 }
3577 Ok(())
3578 }
3579
3580 pub fn rename_index(&mut self, old: &str, new: &str) -> Result<(), StorageError> {
3585 if old == new {
3586 return Ok(());
3587 }
3588 for t in &self.tables {
3590 if t.indices.iter().any(|i| i.name == new) {
3591 return Err(StorageError::Corrupt(format!(
3592 "rename_index: target name {new:?} already exists"
3593 )));
3594 }
3595 }
3596 for t in &mut self.tables {
3597 for i in &mut t.indices {
3598 if i.name == old {
3599 i.name = new.to_string();
3600 return Ok(());
3601 }
3602 }
3603 }
3604 Err(StorageError::IndexNotFound { name: old.into() })
3605 }
3606
3607 pub fn drop_named_index(&mut self, name: &str) -> bool {
3610 for t in &mut self.tables {
3611 let before = t.indices.len();
3612 t.indices.retain(|i| i.name != name);
3613 if t.indices.len() != before {
3614 return true;
3615 }
3616 }
3617 false
3618 }
3619
3620 pub fn table_names(&self) -> Vec<String> {
3623 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3624 }
3625
3626 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3637 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3638 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3639 })?;
3640 let seg = OwnedSegment::from_bytes(bytes)
3641 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3642 self.cold_segments.push(Some(Arc::new(seg)));
3643 Ok(id)
3644 }
3645
3646 pub fn load_segment_bytes_at(
3659 &mut self,
3660 target_id: u32,
3661 bytes: Vec<u8>,
3662 ) -> Result<(), StorageError> {
3663 let seg = OwnedSegment::from_bytes(bytes)
3664 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3665 let idx = target_id as usize;
3666 while self.cold_segments.len() <= idx {
3667 self.cold_segments.push(None);
3668 }
3669 if self.cold_segments[idx].is_some() {
3670 return Err(StorageError::Corrupt(format!(
3671 "load_segment_bytes_at: segment_id {target_id} already occupied"
3672 )));
3673 }
3674 self.cold_segments[idx] = Some(Arc::new(seg));
3675 Ok(())
3676 }
3677
3678 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3688 let idx = segment_id as usize;
3689 if idx >= self.cold_segments.len() {
3690 return Err(StorageError::Corrupt(format!(
3691 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3692 self.cold_segments.len()
3693 )));
3694 }
3695 self.cold_segments[idx] = None;
3696 Ok(())
3697 }
3698
3699 #[must_use]
3701 pub fn cold_segment_count(&self) -> usize {
3702 self.cold_segments.iter().filter(|s| s.is_some()).count()
3703 }
3704
3705 #[must_use]
3708 pub fn cold_segment_slot_count(&self) -> usize {
3709 self.cold_segments.len()
3710 }
3711
3712 #[must_use]
3717 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3718 self.cold_segments
3719 .iter()
3720 .enumerate()
3721 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3722 .collect()
3723 }
3724
3725 #[must_use]
3732 pub fn hot_tier_bytes(&self) -> u64 {
3733 self.tables
3734 .iter()
3735 .map(Table::hot_bytes)
3736 .fold(0u64, u64::saturating_add)
3737 }
3738
3739 pub fn freeze_oldest_to_cold(
3784 &mut self,
3785 table_name: &str,
3786 index_name: &str,
3787 max_rows: usize,
3788 ) -> Result<FreezeReport, StorageError> {
3789 if max_rows == 0 {
3791 return Err(StorageError::Corrupt(
3792 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3793 ));
3794 }
3795 let table = self.get(table_name).ok_or_else(|| {
3796 StorageError::Corrupt(format!(
3797 "freeze_oldest_to_cold: table {table_name:?} not found"
3798 ))
3799 })?;
3800 if max_rows > table.rows.len() {
3801 return Err(StorageError::Corrupt(format!(
3802 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3803 table.rows.len()
3804 )));
3805 }
3806 let idx = table
3807 .indices
3808 .iter()
3809 .find(|i| i.name == index_name)
3810 .ok_or_else(|| {
3811 StorageError::Corrupt(format!(
3812 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3813 ))
3814 })?;
3815 if !matches!(idx.kind, IndexKind::BTree(_)) {
3816 return Err(StorageError::Corrupt(format!(
3817 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3818 )));
3819 }
3820 let column_position = idx.column_position;
3821
3822 let schema = table.schema.clone();
3824 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3825 for row_idx in 0..max_rows {
3826 let row = table.rows.get(row_idx).expect("bounds-checked above");
3827 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3828 StorageError::Corrupt(format!(
3829 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3830 ))
3831 })?;
3832 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3833 StorageError::Corrupt(format!(
3834 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3835 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3836 ))
3837 })?;
3838 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3839 }
3840 to_freeze.sort_by_key(|(k, _, _)| *k);
3845 for w in to_freeze.windows(2) {
3849 if w[0].0 == w[1].0 {
3850 return Err(StorageError::Corrupt(format!(
3851 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3852 w[0].0
3853 )));
3854 }
3855 }
3856 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3860 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3864 .into_iter()
3865 .map(|(k, body, _)| (k, body))
3866 .collect();
3867 let frozen_rows = seg_rows.len();
3868 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3869 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3870
3871 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3880 let positions: Vec<usize> = (0..max_rows).collect();
3881 let t_mut = self
3882 .get_mut(table_name)
3883 .expect("just validated; still present");
3884 let removed = t_mut.delete_rows(&positions);
3885 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3886 let bytes_after = t_mut.hot_bytes();
3887 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3888
3889 let segment_id = self
3890 .load_segment_bytes(seg_bytes.clone())
3891 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3892 let new_cold = post_swap_keys.into_iter().map(|k| {
3893 (
3894 k,
3895 RowLocator::Cold {
3896 segment_id,
3897 page_offset: 0,
3898 },
3899 )
3900 });
3901 let t_mut = self.get_mut(table_name).expect("still present");
3902 t_mut.register_cold_locators(index_name, new_cold)?;
3903
3904 Ok(FreezeReport {
3905 segment_id,
3906 frozen_rows,
3907 bytes_freed,
3908 segment_bytes: seg_bytes,
3909 })
3910 }
3911
3912 #[must_use]
3918 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3919 self.cold_segments
3920 .get(segment_id as usize)
3921 .and_then(|s| s.as_deref())
3922 }
3923
3924 pub fn resolve_cold_locator(
3933 &self,
3934 table_name: &str,
3935 segment_id: u32,
3936 key: &IndexKey,
3937 ) -> Option<Row> {
3938 let t = self.get(table_name)?;
3939 let u64_key = index_key_as_u64(key)?;
3940 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3941 let payload = seg.lookup(u64_key)?;
3942 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3943 Some(row)
3944 }
3945
3946 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3964 let t = self.get(table)?;
3965 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3966 let locators = idx.lookup_eq(key);
3967 let cold_u64_key = index_key_as_u64(key);
3968 for loc in locators {
3969 match *loc {
3970 RowLocator::Hot(i) => {
3971 if let Some(row) = t.rows.get(i) {
3972 return Some(row.clone());
3973 }
3974 }
3975 RowLocator::Cold {
3976 segment_id,
3977 page_offset: _,
3978 } => {
3979 let Some(u64_key) = cold_u64_key else {
3980 continue;
3983 };
3984 let Some(seg) = self
3985 .cold_segments
3986 .get(segment_id as usize)
3987 .and_then(|s| s.as_deref())
3988 else {
3989 continue;
4000 };
4001 let Some(payload) = seg.lookup(u64_key) else {
4002 continue;
4003 };
4004 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
4005 return Some(row);
4006 }
4007 }
4008 }
4009 None
4010 }
4011
4012 pub fn promote_cold_row(
4034 &mut self,
4035 table_name: &str,
4036 index_name: &str,
4037 key: &IndexKey,
4038 ) -> Result<Option<usize>, StorageError> {
4039 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
4040 let Some((segment_id, _page_offset)) = cold_loc else {
4041 return Ok(None);
4042 };
4043 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4044 StorageError::Corrupt(
4045 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
4046 .into(),
4047 )
4048 })?;
4049 let schema = self
4053 .get(table_name)
4054 .ok_or_else(|| {
4055 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
4056 })?
4057 .schema
4058 .clone();
4059 let seg = self
4060 .cold_segments
4061 .get(segment_id as usize)
4062 .and_then(|s| s.as_ref())
4063 .ok_or_else(|| {
4064 StorageError::Corrupt(format!(
4065 "promote_cold_row: segment {segment_id} not registered on catalog"
4066 ))
4067 })?;
4068 let payload = seg.lookup(u64_key).ok_or_else(|| {
4069 StorageError::Corrupt(format!(
4070 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
4071 but the segment's bloom/page lookup didn't return a row"
4072 ))
4073 })?;
4074 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
4075 let t = self
4080 .get_mut(table_name)
4081 .expect("table existed at lookup time");
4082 t.insert(row)?;
4083 let new_hot_idx =
4084 t.rows.len().checked_sub(1).ok_or_else(|| {
4085 StorageError::Corrupt("promote_cold_row: empty after insert".into())
4086 })?;
4087 t.remove_cold_locators_for_key(index_name, key)?;
4091 Ok(Some(new_hot_idx))
4092 }
4093
4094 pub fn shadow_cold_row(
4112 &mut self,
4113 table_name: &str,
4114 index_name: &str,
4115 key: &IndexKey,
4116 ) -> Result<usize, StorageError> {
4117 let t = self.get_mut(table_name).ok_or_else(|| {
4118 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
4119 })?;
4120 t.remove_cold_locators_for_key(index_name, key)
4121 }
4122
4123 pub fn prepare_freeze_slice(
4141 &self,
4142 table_name: &str,
4143 index_name: &str,
4144 row_range: core::ops::Range<usize>,
4145 ) -> Result<FreezeSlice, StorageError> {
4146 let table = self.get(table_name).ok_or_else(|| {
4147 StorageError::Corrupt(format!(
4148 "prepare_freeze_slice: table {table_name:?} not found"
4149 ))
4150 })?;
4151 let idx = table
4152 .indices
4153 .iter()
4154 .find(|i| i.name == index_name)
4155 .ok_or_else(|| {
4156 StorageError::Corrupt(format!(
4157 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
4158 ))
4159 })?;
4160 if !matches!(idx.kind, IndexKind::BTree(_)) {
4161 return Err(StorageError::Corrupt(format!(
4162 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
4163 )));
4164 }
4165 if row_range.end > table.rows.len() {
4166 return Err(StorageError::Corrupt(format!(
4167 "prepare_freeze_slice: row_range end {} > row_count {}",
4168 row_range.end,
4169 table.rows.len()
4170 )));
4171 }
4172 let column_position = idx.column_position;
4173 let schema = table.schema.clone();
4174 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
4175 for row_idx in row_range.clone() {
4176 let row = table.rows.get(row_idx).expect("bounds-checked above");
4177 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
4178 StorageError::Corrupt(format!(
4179 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
4180 ))
4181 })?;
4182 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
4183 StorageError::Corrupt(format!(
4184 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
4185 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4186 ))
4187 })?;
4188 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
4189 }
4190 rows.sort_by_key(|(k, _, _)| *k);
4191 Ok(FreezeSlice { row_range, rows })
4192 }
4193
4194 pub fn commit_freeze_slices(
4208 &mut self,
4209 table_name: &str,
4210 index_name: &str,
4211 slices: Vec<FreezeSlice>,
4212 ) -> Result<FreezeReport, StorageError> {
4213 let table = self.get(table_name).ok_or_else(|| {
4215 StorageError::Corrupt(format!(
4216 "commit_freeze_slices: table {table_name:?} not found"
4217 ))
4218 })?;
4219 let idx = table
4220 .indices
4221 .iter()
4222 .find(|i| i.name == index_name)
4223 .ok_or_else(|| {
4224 StorageError::Corrupt(format!(
4225 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
4226 ))
4227 })?;
4228 if !matches!(idx.kind, IndexKind::BTree(_)) {
4229 return Err(StorageError::Corrupt(format!(
4230 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
4231 )));
4232 }
4233 let mut ordered = slices;
4237 ordered.sort_by_key(|s| s.row_range.start);
4238 let mut expected_start = 0usize;
4242 for s in &ordered {
4243 if s.row_range.start != expected_start {
4244 return Err(StorageError::Corrupt(format!(
4245 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
4246 s.row_range.start, expected_start
4247 )));
4248 }
4249 expected_start = s.row_range.end;
4250 }
4251 let max_rows = expected_start;
4252 if max_rows > table.rows.len() {
4253 return Err(StorageError::Corrupt(format!(
4254 "commit_freeze_slices: total row range {} exceeds row_count {}",
4255 max_rows,
4256 table.rows.len()
4257 )));
4258 }
4259 if max_rows == 0 {
4260 return Ok(FreezeReport {
4261 segment_id: u32::MAX,
4262 frozen_rows: 0,
4263 bytes_freed: 0,
4264 segment_bytes: Vec::new(),
4265 });
4266 }
4267
4268 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
4273 if total_rows != max_rows {
4274 return Err(StorageError::Corrupt(format!(
4275 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
4276 )));
4277 }
4278 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
4279 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
4280 loop {
4281 let mut pick: Option<usize> = None;
4284 for (i, c) in cursors.iter().enumerate() {
4285 let slice = &ordered[i];
4286 if *c >= slice.rows.len() {
4287 continue;
4288 }
4289 match pick {
4290 None => pick = Some(i),
4291 Some(j) => {
4292 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
4293 pick = Some(i);
4294 }
4295 }
4296 }
4297 }
4298 let Some(i) = pick else { break };
4299 let row = ordered[i].rows[cursors[i]].clone();
4300 cursors[i] += 1;
4301 merged.push(row);
4302 }
4303 for w in merged.windows(2) {
4306 if w[0].0 == w[1].0 {
4307 return Err(StorageError::Corrupt(format!(
4308 "commit_freeze_slices: duplicate PK {} across slices",
4309 w[0].0
4310 )));
4311 }
4312 }
4313 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
4314 let seg_rows: Vec<(u64, Vec<u8>)> =
4315 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
4316 let frozen_rows = seg_rows.len();
4317 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4318 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
4319
4320 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
4322 let positions: Vec<usize> = (0..max_rows).collect();
4323 let t_mut = self
4324 .get_mut(table_name)
4325 .expect("just validated; still present");
4326 let removed = t_mut.delete_rows(&positions);
4327 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
4328 let bytes_after = t_mut.hot_bytes();
4329 let bytes_freed = bytes_before.saturating_sub(bytes_after);
4330
4331 let segment_id = self
4332 .load_segment_bytes(seg_bytes.clone())
4333 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
4334 let new_cold = post_swap_keys.into_iter().map(|k| {
4335 (
4336 k,
4337 RowLocator::Cold {
4338 segment_id,
4339 page_offset: 0,
4340 },
4341 )
4342 });
4343 let t_mut = self.get_mut(table_name).expect("still present");
4344 t_mut.register_cold_locators(index_name, new_cold)?;
4345
4346 Ok(FreezeReport {
4347 segment_id,
4348 frozen_rows,
4349 bytes_freed,
4350 segment_bytes: seg_bytes,
4351 })
4352 }
4353
4354 pub fn compact_cold_segments(
4397 &mut self,
4398 table_name: &str,
4399 index_name: &str,
4400 target_segment_bytes: u64,
4401 ) -> Result<CompactReport, StorageError> {
4402 let t = self.get(table_name).ok_or_else(|| {
4404 StorageError::Corrupt(format!(
4405 "compact_cold_segments: table {table_name:?} not found"
4406 ))
4407 })?;
4408 let idx = t
4409 .indices
4410 .iter()
4411 .find(|i| i.name == index_name)
4412 .ok_or_else(|| {
4413 StorageError::Corrupt(format!(
4414 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4415 ))
4416 })?;
4417 let map = match &idx.kind {
4418 IndexKind::BTree(m) => m,
4419 IndexKind::Nsw(_)
4420 | IndexKind::Brin { .. }
4421 | IndexKind::Gin(_)
4422 | IndexKind::GinTrgm(_) => {
4423 return Err(StorageError::Corrupt(format!(
4424 "compact_cold_segments: index {index_name:?} is not BTree; \
4425 compaction applies only to BTree cold-tier indices"
4426 )));
4427 }
4428 };
4429
4430 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4433 for (_key, locators) in map.iter() {
4434 for loc in locators {
4435 if let RowLocator::Cold { segment_id, .. } = loc {
4436 referenced_ids.insert(*segment_id);
4437 }
4438 }
4439 }
4440 let candidate_set: BTreeSet<u32> = referenced_ids
4442 .into_iter()
4443 .filter(|id| {
4444 self.cold_segments
4445 .get(*id as usize)
4446 .and_then(|s| s.as_deref())
4447 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4448 })
4449 .collect();
4450 if candidate_set.len() < 2 {
4451 return Ok(CompactReport {
4452 sources: Vec::new(),
4453 merged_segment_id: None,
4454 merged_segment_bytes: Vec::new(),
4455 merged_rows: 0,
4456 deleted_rows_pruned: 0,
4457 bytes_reclaimed_estimate: 0,
4458 });
4459 }
4460 let mut source_row_count: usize = 0;
4462 let mut source_byte_total: u64 = 0;
4463 for &id in &candidate_set {
4464 let seg = self.cold_segments[id as usize]
4465 .as_ref()
4466 .expect("candidate selected only when slot is Some");
4467 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4468 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4469 }
4470 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4476 for (key, locators) in map.iter() {
4477 for loc in locators {
4478 let RowLocator::Cold { segment_id, .. } = loc else {
4479 continue;
4480 };
4481 if !candidate_set.contains(segment_id) {
4482 continue;
4483 }
4484 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4485 StorageError::Corrupt(format!(
4486 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4487 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4488 ))
4489 })?;
4490 let seg = self.cold_segments[*segment_id as usize]
4491 .as_ref()
4492 .expect("candidate slot guaranteed Some above");
4493 let payload = seg.lookup(u64_key).ok_or_else(|| {
4494 StorageError::Corrupt(format!(
4495 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4496 at segment {segment_id} but the segment lookup missed"
4497 ))
4498 })?;
4499 collected.insert(u64_key, (payload, key.clone()));
4500 break;
4501 }
4502 }
4503 let merged_rows = collected.len();
4504 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4505
4506 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4510 .iter()
4511 .map(|(k, (body, _))| (*k, body.clone()))
4512 .collect();
4513 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4514 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4515 let merged_bytes_len = seg_bytes.len() as u64;
4516
4517 let merged_segment_id = self
4519 .load_segment_bytes(seg_bytes.clone())
4520 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4521
4522 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4528 let t = self
4529 .get(table_name)
4530 .expect("table existed at the start of this fn");
4531 let idx = t
4532 .indices
4533 .iter()
4534 .find(|i| i.name == index_name)
4535 .expect("index existed at the start of this fn");
4536 let IndexKind::BTree(map) = &idx.kind else {
4537 unreachable!("validated above");
4538 };
4539 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4540 };
4541 let t_mut = self
4542 .get_mut(table_name)
4543 .expect("table existed at the start of this fn");
4544 let idx_mut = t_mut
4545 .indices
4546 .iter_mut()
4547 .find(|i| i.name == index_name)
4548 .expect("index existed at the start of this fn");
4549 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4550 unreachable!("validated above");
4551 };
4552 for (key, locators) in entries {
4553 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4554 let mut changed = false;
4555 for loc in &locators {
4556 match *loc {
4557 RowLocator::Cold {
4558 segment_id,
4559 page_offset: _,
4560 } if candidate_set.contains(&segment_id) => {
4561 let replacement = RowLocator::Cold {
4562 segment_id: merged_segment_id,
4563 page_offset: 0,
4564 };
4565 if !new_locs.contains(&replacement) {
4566 new_locs.push(replacement);
4567 }
4568 changed = true;
4569 }
4570 other => new_locs.push(other),
4571 }
4572 }
4573 if changed {
4574 map_mut.insert_mut(key, new_locs);
4575 }
4576 }
4577
4578 for &id in &candidate_set {
4583 self.tombstone_segment(id)?;
4584 }
4585
4586 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4587 Ok(CompactReport {
4588 sources: candidate_set.into_iter().collect(),
4589 merged_segment_id: Some(merged_segment_id),
4590 merged_segment_bytes: seg_bytes,
4591 merged_rows,
4592 deleted_rows_pruned,
4593 bytes_reclaimed_estimate,
4594 })
4595 }
4596
4597 fn find_cold_locator(
4603 &self,
4604 table_name: &str,
4605 index_name: &str,
4606 key: &IndexKey,
4607 ) -> Result<Option<(u32, u32)>, StorageError> {
4608 let t = self.get(table_name).ok_or_else(|| {
4609 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4610 })?;
4611 let idx = t
4612 .indices
4613 .iter()
4614 .find(|i| i.name == index_name)
4615 .ok_or_else(|| {
4616 StorageError::Corrupt(format!(
4617 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4618 ))
4619 })?;
4620 if !matches!(idx.kind, IndexKind::BTree(_)) {
4621 return Err(StorageError::Corrupt(format!(
4622 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4623 )));
4624 }
4625 for loc in idx.lookup_eq(key) {
4626 if let RowLocator::Cold {
4627 segment_id,
4628 page_offset,
4629 } = *loc
4630 {
4631 return Ok(Some((segment_id, page_offset)));
4632 }
4633 }
4634 Ok(None)
4635 }
4636}
4637
4638fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4644 match key {
4645 IndexKey::Int(n) => Some(n.cast_unsigned()),
4651 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4652 }
4653}
4654
4655#[derive(Debug, Clone, PartialEq, Eq)]
4656#[non_exhaustive]
4657pub enum StorageError {
4658 DuplicateTable {
4659 name: String,
4660 },
4661 TableNotFound {
4662 name: String,
4663 },
4664 ArityMismatch {
4665 expected: usize,
4666 actual: usize,
4667 },
4668 TypeMismatch {
4669 column: String,
4670 expected: DataType,
4671 actual: DataType,
4672 position: usize,
4673 },
4674 NullInNotNull {
4675 column: String,
4676 },
4677 DuplicateIndex {
4679 name: String,
4680 },
4681 ColumnNotFound {
4683 column: String,
4684 },
4685 Corrupt(String),
4688 IndexNotFound {
4691 name: String,
4692 },
4693 Unsupported(String),
4697}
4698
4699impl fmt::Display for StorageError {
4700 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4701 match self {
4702 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4703 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4704 Self::ArityMismatch { expected, actual } => write!(
4705 f,
4706 "row arity mismatch: expected {expected} columns, got {actual}"
4707 ),
4708 Self::TypeMismatch {
4709 column,
4710 expected,
4711 actual,
4712 position,
4713 } => write!(
4714 f,
4715 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4716 ),
4717 Self::NullInNotNull { column } => {
4718 write!(f, "NULL value in NOT NULL column {column:?}")
4719 }
4720 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4721 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4722 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4723 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4724 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4725 }
4726 }
4727}
4728
4729impl ColumnSchema {
4730 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4731 Self {
4732 name: name.into(),
4733 ty,
4734 nullable,
4735 default: None,
4736 runtime_default: None,
4737 auto_increment: false,
4738 }
4739 }
4740
4741 #[must_use]
4745 pub fn with_default(mut self, default: Value) -> Self {
4746 self.default = Some(default);
4747 self
4748 }
4749
4750 #[must_use]
4755 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4756 self.runtime_default = Some(expr.into());
4757 self
4758 }
4759
4760 #[must_use]
4762 pub const fn with_auto_increment(mut self) -> Self {
4763 self.auto_increment = true;
4764 self
4765 }
4766}
4767
4768impl TableSchema {
4769 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4770 Self {
4771 name: name.into(),
4772 columns,
4773 hot_tier_bytes: None,
4774 foreign_keys: Vec::new(),
4775 uniqueness_constraints: Vec::new(),
4776 checks: Vec::new(),
4777 }
4778 }
4779}
4780
4781const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4829const FILE_VERSION: u8 = 25;
4878const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4881
4882const INDEX_KEY_TAG_INT: u8 = 0;
4887const INDEX_KEY_TAG_TEXT: u8 = 1;
4888const INDEX_KEY_TAG_BOOL: u8 = 2;
4889
4890impl Catalog {
4891 pub fn serialize(&self) -> Vec<u8> {
4894 let mut out = Vec::with_capacity(64);
4895 out.extend_from_slice(FILE_MAGIC);
4896 out.push(FILE_VERSION);
4897 write_u32(
4898 &mut out,
4899 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4900 );
4901 for t in &self.tables {
4902 write_str(&mut out, &t.schema.name);
4903 write_u16(
4904 &mut out,
4905 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4906 );
4907 for c in &t.schema.columns {
4908 write_str(&mut out, &c.name);
4909 write_data_type(&mut out, c.ty);
4910 out.push(u8::from(c.nullable));
4911 match &c.default {
4912 None => out.push(0),
4913 Some(v) => {
4914 out.push(1);
4915 write_value(&mut out, v);
4916 }
4917 }
4918 out.push(u8::from(c.auto_increment));
4919 }
4920 write_u32(
4921 &mut out,
4922 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4923 );
4924 for row in &t.rows {
4929 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4930 }
4931 write_u16(
4938 &mut out,
4939 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4940 );
4941 for idx in &t.indices {
4942 write_str(&mut out, &idx.name);
4943 write_u16(
4944 &mut out,
4945 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4946 );
4947 match &idx.kind {
4948 IndexKind::BTree(map) => {
4949 out.push(0);
4950 write_u32(
4958 &mut out,
4959 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4960 );
4961 for (key, locators) in map {
4962 write_index_key(&mut out, key);
4963 write_u32(
4964 &mut out,
4965 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4966 );
4967 for loc in locators {
4968 loc.write_le(&mut out);
4969 }
4970 }
4971 }
4972 IndexKind::Nsw(g) => {
4973 out.push(1);
4974 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4975 write_nsw_graph(&mut out, g);
4976 }
4977 IndexKind::Brin { column_type } => {
4978 out.push(2);
4984 write_data_type(&mut out, *column_type);
4985 }
4986 IndexKind::Gin(map) => {
4987 out.push(3);
4996 write_u32(
4997 &mut out,
4998 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4999 );
5000 for (word, locators) in map {
5001 write_str(&mut out, word);
5002 write_u32(
5003 &mut out,
5004 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
5005 );
5006 for loc in locators {
5007 loc.write_le(&mut out);
5008 }
5009 }
5010 }
5011 IndexKind::GinTrgm(map) => {
5012 out.push(4);
5022 write_u32(
5023 &mut out,
5024 u32::try_from(map.len()).expect("≤ 4G trigram-GIN posting lists"),
5025 );
5026 for (tri, locators) in map {
5027 write_str(&mut out, tri);
5028 write_u32(
5029 &mut out,
5030 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
5031 );
5032 for loc in locators {
5033 loc.write_le(&mut out);
5034 }
5035 }
5036 }
5037 }
5038 write_u16(
5044 &mut out,
5045 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
5046 );
5047 for col_pos in &idx.included_columns {
5048 write_u16(
5049 &mut out,
5050 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
5051 );
5052 }
5053 match &idx.partial_predicate {
5057 None => out.push(0),
5058 Some(pred) => {
5059 out.push(1);
5060 write_str(&mut out, pred);
5061 }
5062 }
5063 match &idx.expression {
5066 None => out.push(0),
5067 Some(expr) => {
5068 out.push(1);
5069 write_str(&mut out, expr);
5070 }
5071 }
5072 out.push(u8::from(idx.is_unique));
5076 write_u16(
5079 &mut out,
5080 u16::try_from(idx.extra_column_positions.len())
5081 .expect("≤ 65k extra cols / index"),
5082 );
5083 for cp in &idx.extra_column_positions {
5084 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
5085 }
5086 }
5087 match t.schema.hot_tier_bytes {
5093 None => out.push(0),
5094 Some(n) => {
5095 out.push(1);
5096 out.extend_from_slice(&n.to_le_bytes());
5097 }
5098 }
5099 write_u16(
5110 &mut out,
5111 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
5112 );
5113 for fk in &t.schema.foreign_keys {
5114 match &fk.name {
5115 None => out.push(0),
5116 Some(n) => {
5117 out.push(1);
5118 write_str(&mut out, n);
5119 }
5120 }
5121 write_u16(
5122 &mut out,
5123 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
5124 );
5125 for &p in &fk.local_columns {
5126 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5127 }
5128 write_str(&mut out, &fk.parent_table);
5129 write_u16(
5130 &mut out,
5131 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
5132 );
5133 for &p in &fk.parent_columns {
5134 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5135 }
5136 out.push(fk.on_delete.tag());
5137 out.push(fk.on_update.tag());
5138 }
5139 write_u16(
5148 &mut out,
5149 u16::try_from(t.schema.uniqueness_constraints.len())
5150 .expect("≤ 65k uniqueness constraints/table"),
5151 );
5152 for uc in &t.schema.uniqueness_constraints {
5153 out.push(u8::from(uc.is_primary_key));
5154 write_u16(
5155 &mut out,
5156 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
5157 );
5158 for &p in &uc.columns {
5159 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5160 }
5161 out.push(u8::from(uc.nulls_not_distinct));
5166 }
5167 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
5174 for (i, c) in t.schema.columns.iter().enumerate() {
5175 if let Some(e) = &c.runtime_default {
5176 rt_defaults.push((i, e.as_str()));
5177 }
5178 }
5179 write_u16(
5180 &mut out,
5181 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
5182 );
5183 for (pos, expr) in rt_defaults {
5184 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
5185 write_str(&mut out, expr);
5186 }
5187 write_u16(
5194 &mut out,
5195 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
5196 );
5197 for c in &t.schema.checks {
5198 write_str(&mut out, c.as_str());
5199 }
5200 }
5201 write_u32(
5214 &mut out,
5215 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
5216 );
5217 for fd in self.functions.values() {
5218 write_str(&mut out, &fd.name);
5219 write_str(&mut out, &fd.args_repr);
5220 write_str(&mut out, &fd.returns);
5221 write_str(&mut out, &fd.language);
5222 write_str_long(&mut out, &fd.body);
5223 }
5224 write_u32(
5225 &mut out,
5226 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
5227 );
5228 for td in &self.triggers {
5229 write_str(&mut out, &td.name);
5230 write_str(&mut out, &td.table);
5231 write_str(&mut out, &td.timing);
5232 write_u16(
5233 &mut out,
5234 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
5235 );
5236 for ev in &td.events {
5237 write_str(&mut out, ev);
5238 }
5239 write_str(&mut out, &td.for_each);
5240 write_str(&mut out, &td.function);
5241 write_u16(
5245 &mut out,
5246 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
5247 );
5248 for c in &td.update_columns {
5249 write_str(&mut out, c);
5250 }
5251 out.push(u8::from(td.enabled));
5253 }
5254 out
5255 }
5256
5257 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
5260 let mut cur = Cursor::new(buf);
5261 let magic = cur.take(8)?;
5262 if magic != FILE_MAGIC {
5263 return Err(StorageError::Corrupt(format!(
5264 "bad magic: expected SPGDB001, got {magic:?}"
5265 )));
5266 }
5267 let version = cur.read_u8()?;
5268 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
5269 return Err(StorageError::Corrupt(format!(
5270 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
5271 )));
5272 }
5273 let table_count = cur.read_u32()? as usize;
5274 let mut cat = Self::new();
5275 for _ in 0..table_count {
5276 deserialize_table(&mut cur, &mut cat, version)?;
5277 }
5278 if version >= 22 {
5282 let fn_count = cur.read_u32()? as usize;
5283 for _ in 0..fn_count {
5284 let name = cur.read_str()?;
5285 let args_repr = cur.read_str()?;
5286 let returns = cur.read_str()?;
5287 let language = cur.read_str()?;
5288 let body = cur.read_str_long()?;
5289 cat.functions.insert(
5290 name.clone(),
5291 FunctionDef {
5292 name,
5293 args_repr,
5294 returns,
5295 language,
5296 body,
5297 },
5298 );
5299 }
5300 let trg_count = cur.read_u32()? as usize;
5301 for _ in 0..trg_count {
5302 let name = cur.read_str()?;
5303 let table = cur.read_str()?;
5304 let timing = cur.read_str()?;
5305 let ev_count = cur.read_u16()? as usize;
5306 let mut events = Vec::with_capacity(ev_count);
5307 for _ in 0..ev_count {
5308 events.push(cur.read_str()?);
5309 }
5310 let for_each = cur.read_str()?;
5311 let function = cur.read_str()?;
5312 let update_columns = if version >= 23 {
5316 let n = cur.read_u16()? as usize;
5317 let mut cols = Vec::with_capacity(n);
5318 for _ in 0..n {
5319 cols.push(cur.read_str()?);
5320 }
5321 cols
5322 } else {
5323 Vec::new()
5324 };
5325 let enabled = if version >= 25 {
5329 cur.read_u8()? != 0
5330 } else {
5331 true
5332 };
5333 cat.triggers.push(TriggerDef {
5334 name,
5335 table,
5336 timing,
5337 events,
5338 for_each,
5339 function,
5340 update_columns,
5341 enabled,
5342 });
5343 }
5344 }
5345 if cur.pos < buf.len() {
5346 return Err(StorageError::Corrupt(format!(
5347 "trailing bytes: {} unread",
5348 buf.len() - cur.pos
5349 )));
5350 }
5351 Ok(cat)
5352 }
5353}
5354
5355fn deserialize_table(
5360 cur: &mut Cursor<'_>,
5361 cat: &mut Catalog,
5362 version: u8,
5363) -> Result<(), StorageError> {
5364 let table_name = cur.read_str()?;
5365 let name = table_name.clone();
5366 let col_count = cur.read_u16()? as usize;
5367 let mut cols = Vec::with_capacity(col_count);
5368 for _ in 0..col_count {
5369 let c_name = cur.read_str()?;
5370 let ty = cur.read_data_type()?;
5371 let nullable = cur.read_u8()? != 0;
5372 let default = match cur.read_u8()? {
5373 0 => None,
5374 1 => Some(cur.read_value()?),
5375 other => {
5376 return Err(StorageError::Corrupt(format!(
5377 "unknown default tag: {other}"
5378 )));
5379 }
5380 };
5381 let auto_increment = cur.read_u8()? != 0;
5382 cols.push(ColumnSchema {
5386 name: c_name,
5387 ty,
5388 nullable,
5389 default,
5390 runtime_default: None,
5391 auto_increment,
5392 });
5393 }
5394 let n_cols = cols.len();
5395 cat.create_table(TableSchema::new(name, cols))?;
5396 let t = cat.tables.last_mut().expect("create_table just pushed");
5400 deserialize_rows(cur, t, n_cols)?;
5401 deserialize_indices(cur, t, version)?;
5402 if version >= 11 {
5408 let has = cur.read_u8()?;
5409 let hot_tier_bytes = match has {
5410 0 => None,
5411 1 => Some(cur.read_u64()?),
5412 other => {
5413 return Err(StorageError::Corrupt(format!(
5414 "hot_tier_bytes appendix: unknown has-value byte {other}"
5415 )));
5416 }
5417 };
5418 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
5419 }
5420 if version >= 13 {
5423 let fk_count = cur.read_u16()? as usize;
5424 let mut fks = Vec::with_capacity(fk_count);
5425 for _ in 0..fk_count {
5426 let name = match cur.read_u8()? {
5427 0 => None,
5428 1 => Some(cur.read_str()?),
5429 other => {
5430 return Err(StorageError::Corrupt(format!(
5431 "FK appendix: unknown has-name byte {other}"
5432 )));
5433 }
5434 };
5435 let local_arity = cur.read_u16()? as usize;
5436 let mut local_columns = Vec::with_capacity(local_arity);
5437 for _ in 0..local_arity {
5438 local_columns.push(cur.read_u16()? as usize);
5439 }
5440 let parent_table = cur.read_str()?;
5441 let parent_arity = cur.read_u16()? as usize;
5442 if parent_arity != local_arity {
5443 return Err(StorageError::Corrupt(format!(
5444 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
5445 )));
5446 }
5447 let mut parent_columns = Vec::with_capacity(parent_arity);
5448 for _ in 0..parent_arity {
5449 parent_columns.push(cur.read_u16()? as usize);
5450 }
5451 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5452 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
5453 })?;
5454 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5455 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
5456 })?;
5457 fks.push(ForeignKeyConstraint {
5458 name,
5459 local_columns,
5460 parent_table,
5461 parent_columns,
5462 on_delete,
5463 on_update,
5464 });
5465 }
5466 t.schema_mut().foreign_keys = fks;
5467 }
5468 if version >= 15 {
5471 let uc_count = cur.read_u16()? as usize;
5472 let mut ucs = Vec::with_capacity(uc_count);
5473 for _ in 0..uc_count {
5474 let is_pk = cur.read_u8()? != 0;
5475 let arity = cur.read_u16()? as usize;
5476 let mut cols = Vec::with_capacity(arity);
5477 for _ in 0..arity {
5478 cols.push(cur.read_u16()? as usize);
5479 }
5480 let nulls_not_distinct = if version >= 23 {
5484 cur.read_u8()? != 0
5485 } else {
5486 false
5487 };
5488 ucs.push(UniquenessConstraint {
5489 is_primary_key: is_pk,
5490 columns: cols,
5491 nulls_not_distinct,
5492 });
5493 }
5494 t.schema_mut().uniqueness_constraints = ucs;
5495 let rt_count = cur.read_u16()? as usize;
5497 for _ in 0..rt_count {
5498 let pos = cur.read_u16()? as usize;
5499 let expr = cur.read_str()?;
5500 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5501 col.runtime_default = Some(expr);
5502 }
5503 }
5504 }
5505 if version >= 23 {
5508 let check_count = cur.read_u16()? as usize;
5509 let mut checks = Vec::with_capacity(check_count);
5510 for _ in 0..check_count {
5511 checks.push(cur.read_str()?);
5512 }
5513 t.schema_mut().checks = checks;
5514 }
5515 let _ = table_name;
5516 Ok(())
5517}
5518
5519fn deserialize_rows(
5520 cur: &mut Cursor<'_>,
5521 t: &mut Table,
5522 _n_cols: usize,
5523) -> Result<(), StorageError> {
5524 let row_count = cur.read_u32()? as usize;
5525 let mut hot_bytes: u64 = 0;
5530 for _ in 0..row_count {
5531 let tail = &cur.buf[cur.pos..];
5532 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5533 cur.pos += consumed;
5534 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5540 t.rows.push_mut(row);
5541 }
5542 t.hot_bytes = hot_bytes;
5543 Ok(())
5544}
5545
5546fn deserialize_indices(
5547 cur: &mut Cursor<'_>,
5548 t: &mut Table,
5549 version: u8,
5550) -> Result<(), StorageError> {
5551 let index_count = cur.read_u16()? as usize;
5552 for _ in 0..index_count {
5553 let idx_name = cur.read_str()?;
5554 let col_pos = cur.read_u16()? as usize;
5555 let column_name = t
5556 .schema
5557 .columns
5558 .get(col_pos)
5559 .ok_or_else(|| {
5560 StorageError::Corrupt(format!(
5561 "index {idx_name:?} points at non-existent column position {col_pos}"
5562 ))
5563 })?
5564 .name
5565 .clone();
5566 let kind_tag = cur.read_u8()?;
5567 match kind_tag {
5568 0 => {
5569 if version >= 9 {
5570 let map = read_btree_map(cur)?;
5575 t.restore_btree_index(idx_name, &column_name, map)?;
5576 } else {
5577 t.add_index(idx_name, &column_name)?;
5582 }
5583 }
5584 1 => {
5585 let m = cur.read_u16()? as usize;
5586 let graph = cur.read_nsw_graph(m)?;
5587 t.restore_nsw_index(idx_name, &column_name, graph)?;
5588 }
5589 2 => {
5590 let column_type = cur.read_data_type()?;
5594 t.restore_brin_index(idx_name, &column_name, column_type)?;
5595 }
5596 3 => {
5597 let map = read_gin_map(cur)?;
5602 t.restore_gin_index(idx_name, &column_name, map)?;
5603 }
5604 4 => {
5605 if version < 24 {
5609 return Err(StorageError::Corrupt(format!(
5610 "trigram-GIN index tag 4 found in catalog FILE_VERSION {version}; \
5611 FILE_VERSION 24+ required (v7.15.0 introduced this tag)"
5612 )));
5613 }
5614 let map = read_gin_map(cur)?;
5615 t.restore_gin_trgm_index(idx_name, &column_name, map)?;
5616 }
5617 other => {
5618 return Err(StorageError::Corrupt(format!(
5619 "unknown index kind tag: {other}"
5620 )));
5621 }
5622 }
5623 if version >= 12 {
5626 let num_included = cur.read_u16()? as usize;
5627 if num_included > 0 {
5628 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5629 for _ in 0..num_included {
5630 let cp = cur.read_u16()? as usize;
5631 if cp >= t.schema.columns.len() {
5632 return Err(StorageError::Corrupt(format!(
5633 "INCLUDE column position {cp} out of range \
5634 ({} schema columns)",
5635 t.schema.columns.len()
5636 )));
5637 }
5638 included.push(cp);
5639 }
5640 if let Some(last) = t.indices.last_mut() {
5641 last.included_columns = included;
5642 }
5643 }
5644 match cur.read_u8()? {
5646 0 => {}
5647 1 => {
5648 let pred = cur.read_str()?;
5649 if let Some(last) = t.indices.last_mut() {
5650 last.partial_predicate = Some(pred);
5651 }
5652 }
5653 other => {
5654 return Err(StorageError::Corrupt(format!(
5655 "partial_predicate tag: unknown byte {other}"
5656 )));
5657 }
5658 }
5659 match cur.read_u8()? {
5661 0 => {}
5662 1 => {
5663 let expr = cur.read_str()?;
5664 if let Some(last) = t.indices.last_mut() {
5665 last.expression = Some(expr);
5666 }
5667 }
5668 other => {
5669 return Err(StorageError::Corrupt(format!(
5670 "expression tag: unknown byte {other}"
5671 )));
5672 }
5673 }
5674 if version >= 16 {
5677 match cur.read_u8()? {
5678 0 => {}
5679 1 => {
5680 if let Some(last) = t.indices.last_mut() {
5681 last.is_unique = true;
5682 }
5683 }
5684 other => {
5685 return Err(StorageError::Corrupt(format!(
5686 "is_unique tag: unknown byte {other}"
5687 )));
5688 }
5689 }
5690 let n = cur.read_u16()? as usize;
5692 if n > 0 {
5693 let mut extras: Vec<usize> = Vec::with_capacity(n);
5694 for _ in 0..n {
5695 let cp = cur.read_u16()? as usize;
5696 if cp >= t.schema.columns.len() {
5697 return Err(StorageError::Corrupt(format!(
5698 "extra column position {cp} out of range \
5699 ({} schema columns)",
5700 t.schema.columns.len()
5701 )));
5702 }
5703 extras.push(cp);
5704 }
5705 if let Some(last) = t.indices.last_mut() {
5706 last.extra_column_positions = extras;
5707 }
5708 }
5709 }
5710 }
5711 }
5712 Ok(())
5713}
5714
5715fn read_btree_map(
5719 cur: &mut Cursor<'_>,
5720) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5721 let entry_count = cur.read_u32()? as usize;
5722 let mut map = PersistentBTreeMap::new();
5723 for _ in 0..entry_count {
5724 let key = cur.read_index_key()?;
5725 let locator_count = cur.read_u32()? as usize;
5726 let mut locators = Vec::with_capacity(locator_count);
5727 for _ in 0..locator_count {
5728 let tail = &cur.buf[cur.pos..];
5729 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5730 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5731 })?;
5732 cur.pos += consumed;
5733 locators.push(loc);
5734 }
5735 map.insert_mut(key, locators);
5736 }
5737 Ok(map)
5738}
5739
5740fn read_gin_map(
5744 cur: &mut Cursor<'_>,
5745) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5746 let entry_count = cur.read_u32()? as usize;
5747 let mut map = PersistentBTreeMap::new();
5748 for _ in 0..entry_count {
5749 let word = cur.read_str()?;
5750 let locator_count = cur.read_u32()? as usize;
5751 let mut locators = Vec::with_capacity(locator_count);
5752 for _ in 0..locator_count {
5753 let tail = &cur.buf[cur.pos..];
5754 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5755 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5756 })?;
5757 cur.pos += consumed;
5758 locators.push(loc);
5759 }
5760 map.insert_mut(word, locators);
5761 }
5762 Ok(map)
5763}
5764
5765fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5781 let entry = g.entry.map_or(u32::MAX, |e| {
5782 u32::try_from(e).expect("NSW entry fits in u32")
5783 });
5784 write_u16(
5785 out,
5786 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5787 );
5788 out.extend_from_slice(&entry.to_le_bytes());
5789 out.push(g.entry_level);
5790 let node_count = g.levels.len();
5791 write_u32(
5792 out,
5793 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5794 );
5795 for &lvl in &g.levels {
5796 out.push(lvl);
5797 }
5798 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5799 out.push(layer_count);
5800 for layer in &g.layers {
5801 write_u32(
5802 out,
5803 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5804 );
5805 for neighbors in layer {
5806 write_u16(
5807 out,
5808 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5809 );
5810 for &peer in neighbors {
5814 write_u32(out, peer);
5815 }
5816 }
5817 }
5818}
5819
5820fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5821 match t {
5822 DataType::Int => out.push(1),
5823 DataType::BigInt => out.push(2),
5824 DataType::Float => out.push(3),
5825 DataType::Text => out.push(4),
5826 DataType::Bool => out.push(5),
5827 DataType::Vector { dim, encoding } => match encoding {
5828 VecEncoding::F32 => {
5832 out.push(6);
5833 out.extend_from_slice(&dim.to_le_bytes());
5834 }
5835 VecEncoding::F16 => {
5838 out.push(15);
5839 out.extend_from_slice(&dim.to_le_bytes());
5840 }
5841 VecEncoding::Sq8 => {
5847 out.push(14);
5848 out.extend_from_slice(&dim.to_le_bytes());
5849 }
5850 },
5851 DataType::SmallInt => out.push(7),
5852 DataType::Varchar(max) => {
5853 out.push(8);
5854 out.extend_from_slice(&max.to_le_bytes());
5855 }
5856 DataType::Char(size) => {
5857 out.push(9);
5858 out.extend_from_slice(&size.to_le_bytes());
5859 }
5860 DataType::Numeric { precision, scale } => {
5861 out.push(10);
5862 out.push(precision);
5863 out.push(scale);
5864 }
5865 DataType::Date => out.push(11),
5866 DataType::Timestamp => out.push(12),
5867 DataType::Timestamptz => out.push(17),
5871 DataType::Interval => {
5876 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5877 }
5878 DataType::Json => out.push(13),
5879 DataType::Jsonb => out.push(16),
5882 DataType::Bytes => out.push(18),
5884 DataType::TextArray => out.push(19),
5887 DataType::IntArray => out.push(20),
5890 DataType::BigIntArray => out.push(21),
5893 DataType::TsVector => out.push(22),
5896 DataType::TsQuery => out.push(23),
5899 }
5900}
5901
5902impl Cursor<'_> {
5903 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5904 let tag = self.read_u8()?;
5905 match tag {
5906 1 => Ok(DataType::Int),
5907 2 => Ok(DataType::BigInt),
5908 3 => Ok(DataType::Float),
5909 4 => Ok(DataType::Text),
5910 5 => Ok(DataType::Bool),
5911 6 => Ok(DataType::Vector {
5912 dim: self.read_u32()?,
5913 encoding: VecEncoding::F32,
5914 }),
5915 7 => Ok(DataType::SmallInt),
5916 8 => Ok(DataType::Varchar(self.read_u32()?)),
5917 9 => Ok(DataType::Char(self.read_u32()?)),
5918 10 => {
5919 let precision = self.read_u8()?;
5920 let scale = self.read_u8()?;
5921 Ok(DataType::Numeric { precision, scale })
5922 }
5923 11 => Ok(DataType::Date),
5924 12 => Ok(DataType::Timestamp),
5925 13 => Ok(DataType::Json),
5926 14 => Ok(DataType::Vector {
5927 dim: self.read_u32()?,
5928 encoding: VecEncoding::Sq8,
5929 }),
5930 15 => Ok(DataType::Vector {
5934 dim: self.read_u32()?,
5935 encoding: VecEncoding::F16,
5936 }),
5937 16 => Ok(DataType::Jsonb),
5941 17 => Ok(DataType::Timestamptz),
5945 18 => Ok(DataType::Bytes),
5947 19 => Ok(DataType::TextArray),
5949 20 => Ok(DataType::IntArray),
5951 21 => Ok(DataType::BigIntArray),
5952 22 => Ok(DataType::TsVector),
5955 23 => Ok(DataType::TsQuery),
5956 other => Err(StorageError::Corrupt(format!(
5957 "unknown data type tag: {other}"
5958 ))),
5959 }
5960 }
5961}
5962
5963pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5969 debug_assert_eq!(
5970 row.values.len(),
5971 schema.columns.len(),
5972 "row_body_encoded_len: row arity must match schema"
5973 );
5974 let bitmap_bytes = schema.columns.len().div_ceil(8);
5975 let mut n = bitmap_bytes;
5976 for (col_idx, v) in row.values.iter().enumerate() {
5977 if matches!(v, Value::Null) {
5978 continue;
5979 }
5980 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5981 }
5982 n
5983}
5984
5985fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5991 match v {
5992 Value::SmallInt(_) => 2,
5993 Value::Int(_) | Value::Date(_) => 4,
5995 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5997 Value::Bool(_) => 1,
5998 Value::Text(s) | Value::Json(s) => 2 + s.len(),
6000 Value::Vector(vec) => 4 + 4 * vec.len(),
6002 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
6009 Value::HalfVector(h) => 4 + h.bytes.len(),
6012 Value::Numeric { .. } => 16 + 1,
6014 Value::Bytes(b) => 2 + b.len(),
6020 Value::TextArray(items) => {
6023 let mut n = 2; for item in items {
6025 n += 1; if let Some(s) = item {
6027 n += 2 + s.len();
6028 }
6029 }
6030 n
6031 }
6032 Value::IntArray(items) => {
6035 2 + items
6036 .iter()
6037 .map(|x| if x.is_some() { 5 } else { 1 })
6038 .sum::<usize>()
6039 }
6040 Value::BigIntArray(items) => {
6041 2 + items
6042 .iter()
6043 .map(|x| if x.is_some() { 9 } else { 1 })
6044 .sum::<usize>()
6045 }
6046 Value::TsVector(lexs) => {
6050 let mut n = 2;
6051 for l in lexs {
6052 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
6053 }
6054 n
6055 }
6056 Value::TsQuery(ast) => tsquery_encoded_len(ast),
6059 Value::Null => 0,
6061 Value::Interval { .. } => {
6063 unreachable!("Value::Interval has no on-disk encoding")
6064 }
6065 }
6066}
6067
6068pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
6079 debug_assert_eq!(
6080 row.values.len(),
6081 schema.columns.len(),
6082 "dense encode: row arity must match schema"
6083 );
6084 let bitmap_bytes = schema.columns.len().div_ceil(8);
6085 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
6088 let bitmap_offset = out.len();
6089 out.resize(bitmap_offset + bitmap_bytes, 0);
6090 for (i, v) in row.values.iter().enumerate() {
6091 if matches!(v, Value::Null) {
6092 out[bitmap_offset + i / 8] |= 1 << (i % 8);
6093 }
6094 }
6095 for (col_idx, v) in row.values.iter().enumerate() {
6096 if matches!(v, Value::Null) {
6097 continue;
6098 }
6099 write_value_body(&mut out, v, schema.columns[col_idx].ty);
6100 }
6101 out
6102}
6103
6104pub fn decode_row_body_dense(
6110 bytes: &[u8],
6111 schema: &TableSchema,
6112) -> Result<(Row, usize), StorageError> {
6113 let mut cur = Cursor::new(bytes);
6114 let bitmap_bytes = schema.columns.len().div_ceil(8);
6115 let mut bitmap_buf = [0u8; 32];
6116 if bitmap_bytes > bitmap_buf.len() {
6117 return Err(StorageError::Corrupt(format!(
6118 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
6119 )));
6120 }
6121 let slice = cur.take(bitmap_bytes)?;
6122 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
6123 let mut values = Vec::with_capacity(schema.columns.len());
6124 for (col_idx, col) in schema.columns.iter().enumerate() {
6125 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
6126 values.push(Value::Null);
6127 } else {
6128 values.push(cur.read_value_body(col.ty)?);
6129 }
6130 }
6131 Ok((Row { values }, cur.pos))
6132}
6133
6134fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
6143 match (v, ty) {
6144 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
6145 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
6146 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
6147 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
6148 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
6149 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
6150 write_str(out, s);
6151 }
6152 (
6153 Value::Vector(v),
6154 DataType::Vector {
6155 encoding: VecEncoding::F32,
6156 ..
6157 },
6158 ) => {
6159 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6160 out.extend_from_slice(&dim.to_le_bytes());
6161 for x in v {
6162 out.extend_from_slice(&x.to_le_bytes());
6163 }
6164 }
6165 (
6171 Value::Sq8Vector(q),
6172 DataType::Vector {
6173 encoding: VecEncoding::Sq8,
6174 ..
6175 },
6176 ) => {
6177 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6178 out.extend_from_slice(&dim.to_le_bytes());
6179 out.extend_from_slice(&q.min.to_le_bytes());
6180 out.extend_from_slice(&q.max.to_le_bytes());
6181 out.extend_from_slice(&q.bytes);
6182 }
6183 (
6187 Value::HalfVector(h),
6188 DataType::Vector {
6189 encoding: VecEncoding::F16,
6190 ..
6191 },
6192 ) => {
6193 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6194 out.extend_from_slice(&dim.to_le_bytes());
6195 out.extend_from_slice(&h.bytes);
6196 }
6197 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
6198 out.extend_from_slice(&scaled.to_le_bytes());
6199 out.push(scale);
6200 }
6201 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
6202 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
6203 out.extend_from_slice(&t.to_le_bytes())
6204 }
6205 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
6209 (Value::Bytes(b), DataType::Bytes) => {
6212 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
6213 out.extend_from_slice(&len.to_le_bytes());
6214 out.extend_from_slice(b);
6215 }
6216 (Value::TextArray(items), DataType::TextArray) => {
6219 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6220 out.extend_from_slice(&count.to_le_bytes());
6221 for item in items {
6222 match item {
6223 None => out.push(1),
6224 Some(s) => {
6225 out.push(0);
6226 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6227 out.extend_from_slice(&len.to_le_bytes());
6228 out.extend_from_slice(s.as_bytes());
6229 }
6230 }
6231 }
6232 }
6233 (Value::IntArray(items), DataType::IntArray) => {
6236 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6237 out.extend_from_slice(&count.to_le_bytes());
6238 for item in items {
6239 match item {
6240 None => out.push(1),
6241 Some(n) => {
6242 out.push(0);
6243 out.extend_from_slice(&n.to_le_bytes());
6244 }
6245 }
6246 }
6247 }
6248 (Value::BigIntArray(items), DataType::BigIntArray) => {
6251 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6252 out.extend_from_slice(&count.to_le_bytes());
6253 for item in items {
6254 match item {
6255 None => out.push(1),
6256 Some(n) => {
6257 out.push(0);
6258 out.extend_from_slice(&n.to_le_bytes());
6259 }
6260 }
6261 }
6262 }
6263 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
6266 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
6268 (other, ty) => unreachable!(
6272 "schema-driven encode received mismatched value/type pair: \
6273 value tag={:?}, column type={:?}",
6274 other.data_type(),
6275 ty
6276 ),
6277 }
6278}
6279
6280fn write_value(out: &mut Vec<u8>, v: &Value) {
6281 match v {
6282 Value::Null => out.push(0),
6283 Value::SmallInt(n) => {
6284 out.push(7);
6285 out.extend_from_slice(&n.to_le_bytes());
6286 }
6287 Value::Int(n) => {
6288 out.push(1);
6289 out.extend_from_slice(&n.to_le_bytes());
6290 }
6291 Value::BigInt(n) => {
6292 out.push(2);
6293 out.extend_from_slice(&n.to_le_bytes());
6294 }
6295 Value::Float(x) => {
6296 out.push(3);
6297 out.extend_from_slice(&x.to_le_bytes());
6298 }
6299 Value::Text(s) | Value::Json(s) => {
6304 out.push(4);
6305 write_str(out, s);
6306 }
6307 Value::Bool(b) => {
6308 out.push(5);
6309 out.push(u8::from(*b));
6310 }
6311 Value::Vector(v) => {
6312 out.push(6);
6313 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6314 out.extend_from_slice(&dim.to_le_bytes());
6315 for x in v {
6316 out.extend_from_slice(&x.to_le_bytes());
6317 }
6318 }
6319 Value::Sq8Vector(q) => {
6324 out.push(11);
6325 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6326 out.extend_from_slice(&dim.to_le_bytes());
6327 out.extend_from_slice(&q.min.to_le_bytes());
6328 out.extend_from_slice(&q.max.to_le_bytes());
6329 out.extend_from_slice(&q.bytes);
6330 }
6331 Value::HalfVector(h) => {
6336 out.push(12);
6337 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6338 out.extend_from_slice(&dim.to_le_bytes());
6339 out.extend_from_slice(&h.bytes);
6340 }
6341 Value::Numeric { scaled, scale } => {
6342 out.push(8);
6343 out.extend_from_slice(&scaled.to_le_bytes());
6344 out.push(*scale);
6345 }
6346 Value::Date(d) => {
6347 out.push(9);
6348 out.extend_from_slice(&d.to_le_bytes());
6349 }
6350 Value::Timestamp(t) => {
6351 out.push(10);
6352 out.extend_from_slice(&t.to_le_bytes());
6353 }
6354 Value::Interval { .. } => {
6358 unreachable!(
6359 "Value::Interval has no on-disk encoding; engine must reject it before write"
6360 )
6361 }
6362 Value::Bytes(b) => {
6367 out.push(14);
6368 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
6369 out.extend_from_slice(&len.to_le_bytes());
6370 out.extend_from_slice(b);
6371 }
6372 Value::TextArray(items) => {
6375 out.push(15);
6376 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6377 out.extend_from_slice(&count.to_le_bytes());
6378 for item in items {
6379 match item {
6380 None => out.push(1),
6381 Some(s) => {
6382 out.push(0);
6383 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6384 out.extend_from_slice(&len.to_le_bytes());
6385 out.extend_from_slice(s.as_bytes());
6386 }
6387 }
6388 }
6389 }
6390 Value::IntArray(items) => {
6393 out.push(16);
6394 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6395 out.extend_from_slice(&count.to_le_bytes());
6396 for item in items {
6397 match item {
6398 None => out.push(1),
6399 Some(n) => {
6400 out.push(0);
6401 out.extend_from_slice(&n.to_le_bytes());
6402 }
6403 }
6404 }
6405 }
6406 Value::BigIntArray(items) => {
6409 out.push(17);
6410 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6411 out.extend_from_slice(&count.to_le_bytes());
6412 for item in items {
6413 match item {
6414 None => out.push(1),
6415 Some(n) => {
6416 out.push(0);
6417 out.extend_from_slice(&n.to_le_bytes());
6418 }
6419 }
6420 }
6421 }
6422 Value::TsVector(lexs) => {
6425 out.push(18);
6426 write_tsvector_body(out, lexs);
6427 }
6428 Value::TsQuery(ast) => {
6431 out.push(19);
6432 write_tsquery_body(out, ast);
6433 }
6434 }
6435}
6436
6437fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
6440 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
6441 out.extend_from_slice(&count.to_le_bytes());
6442 for l in lexs {
6443 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
6444 out.extend_from_slice(&wlen.to_le_bytes());
6445 out.extend_from_slice(l.word.as_bytes());
6446 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
6447 out.extend_from_slice(&plen.to_le_bytes());
6448 for p in &l.positions {
6449 out.extend_from_slice(&p.to_le_bytes());
6450 }
6451 out.push(l.weight);
6452 }
6453}
6454
6455fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
6459 match ast {
6460 TsQueryAst::Term { word, weight_mask } => {
6461 out.push(0);
6462 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
6463 out.extend_from_slice(&len.to_le_bytes());
6464 out.extend_from_slice(word.as_bytes());
6465 out.push(*weight_mask);
6466 }
6467 TsQueryAst::And(a, b) => {
6468 out.push(1);
6469 write_tsquery_body(out, a);
6470 write_tsquery_body(out, b);
6471 }
6472 TsQueryAst::Or(a, b) => {
6473 out.push(2);
6474 write_tsquery_body(out, a);
6475 write_tsquery_body(out, b);
6476 }
6477 TsQueryAst::Not(x) => {
6478 out.push(3);
6479 write_tsquery_body(out, x);
6480 }
6481 TsQueryAst::Phrase {
6482 left,
6483 right,
6484 distance,
6485 } => {
6486 out.push(4);
6487 out.extend_from_slice(&distance.to_le_bytes());
6488 write_tsquery_body(out, left);
6489 write_tsquery_body(out, right);
6490 }
6491 }
6492}
6493
6494fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6496 match ast {
6497 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6498 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6499 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6500 }
6501 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6502 TsQueryAst::Phrase { left, right, .. } => {
6503 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6504 }
6505 }
6506}
6507
6508fn write_u16(out: &mut Vec<u8>, n: u16) {
6509 out.extend_from_slice(&n.to_le_bytes());
6510}
6511fn write_u32(out: &mut Vec<u8>, n: u32) {
6512 out.extend_from_slice(&n.to_le_bytes());
6513}
6514fn write_str(out: &mut Vec<u8>, s: &str) {
6515 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6516 write_u16(out, len);
6517 out.extend_from_slice(s.as_bytes());
6518}
6519
6520fn write_str_long(out: &mut Vec<u8>, s: &str) {
6525 let len = u32::try_from(s.len()).expect("function body fits in u32");
6526 write_u32(out, len);
6527 out.extend_from_slice(s.as_bytes());
6528}
6529
6530fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6534 match key {
6535 IndexKey::Int(n) => {
6536 out.push(INDEX_KEY_TAG_INT);
6537 out.extend_from_slice(&n.to_le_bytes());
6538 }
6539 IndexKey::Text(s) => {
6540 out.push(INDEX_KEY_TAG_TEXT);
6541 write_str(out, s);
6542 }
6543 IndexKey::Bool(b) => {
6544 out.push(INDEX_KEY_TAG_BOOL);
6545 out.push(u8::from(*b));
6546 }
6547 }
6548}
6549
6550struct Cursor<'a> {
6551 buf: &'a [u8],
6552 pos: usize,
6553}
6554
6555impl<'a> Cursor<'a> {
6556 const fn new(buf: &'a [u8]) -> Self {
6557 Self { buf, pos: 0 }
6558 }
6559
6560 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6561 let end = self
6562 .pos
6563 .checked_add(n)
6564 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6565 if end > self.buf.len() {
6566 return Err(StorageError::Corrupt(format!(
6567 "unexpected EOF at offset {} (wanted {n} more bytes)",
6568 self.pos
6569 )));
6570 }
6571 let s = &self.buf[self.pos..end];
6572 self.pos = end;
6573 Ok(s)
6574 }
6575
6576 fn read_u8(&mut self) -> Result<u8, StorageError> {
6577 Ok(self.take(1)?[0])
6578 }
6579 fn read_u16(&mut self) -> Result<u16, StorageError> {
6580 let s = self.take(2)?;
6581 Ok(u16::from_le_bytes([s[0], s[1]]))
6582 }
6583 fn read_u32(&mut self) -> Result<u32, StorageError> {
6584 let s = self.take(4)?;
6585 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6586 }
6587 fn read_i32(&mut self) -> Result<i32, StorageError> {
6588 let s = self.take(4)?;
6589 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6590 }
6591 fn read_u64(&mut self) -> Result<u64, StorageError> {
6594 let s = self.take(8)?;
6595 Ok(u64::from_le_bytes([
6596 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6597 ]))
6598 }
6599 fn read_i64(&mut self) -> Result<i64, StorageError> {
6600 let s = self.take(8)?;
6601 let arr: [u8; 8] = s.try_into().expect("checked");
6602 Ok(i64::from_le_bytes(arr))
6603 }
6604 fn read_f64(&mut self) -> Result<f64, StorageError> {
6605 let s = self.take(8)?;
6606 let arr: [u8; 8] = s.try_into().expect("checked");
6607 Ok(f64::from_le_bytes(arr))
6608 }
6609 fn read_f32(&mut self) -> Result<f32, StorageError> {
6610 let s = self.take(4)?;
6611 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6612 }
6613 fn read_str(&mut self) -> Result<String, StorageError> {
6614 let len = self.read_u16()? as usize;
6615 let bytes = self.take(len)?;
6616 core::str::from_utf8(bytes)
6617 .map(String::from)
6618 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6619 }
6620
6621 fn read_str_long(&mut self) -> Result<String, StorageError> {
6625 let len = self.read_u32()? as usize;
6626 let bytes = self.take(len)?;
6627 core::str::from_utf8(bytes)
6628 .map(String::from)
6629 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6630 }
6631
6632 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6636 let tag = self.read_u8()?;
6637 match tag {
6638 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6639 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6640 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6641 other => Err(StorageError::Corrupt(format!(
6642 "unknown index key tag: {other}"
6643 ))),
6644 }
6645 }
6646 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6652 match ty {
6653 DataType::SmallInt => {
6654 let s = self.take(2)?;
6655 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6656 }
6657 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6658 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6659 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6660 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6661 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6662 Ok(Value::Text(self.read_str()?))
6663 }
6664 DataType::Vector {
6665 encoding: VecEncoding::F32,
6666 ..
6667 } => {
6668 let dim = self.read_u32()? as usize;
6669 let mut v = Vec::with_capacity(dim);
6670 for _ in 0..dim {
6671 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6672 v.push(f32::from_le_bytes(bytes));
6673 }
6674 Ok(Value::Vector(v))
6675 }
6676 DataType::Vector {
6677 encoding: VecEncoding::Sq8,
6678 ..
6679 } => {
6680 let dim = self.read_u32()? as usize;
6681 let min = self.read_f32()?;
6682 let max = self.read_f32()?;
6683 let bytes = self.take(dim)?.to_vec();
6684 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6685 }
6686 DataType::Vector {
6687 encoding: VecEncoding::F16,
6688 ..
6689 } => {
6690 let dim = self.read_u32()? as usize;
6691 let bytes = self.take(dim * 2)?.to_vec();
6692 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6693 }
6694 DataType::Numeric { .. } => {
6695 let s = self.take(16)?;
6696 let arr: [u8; 16] = s.try_into().expect("checked");
6697 let scaled = i128::from_le_bytes(arr);
6698 let scale = self.read_u8()?;
6699 Ok(Value::Numeric { scaled, scale })
6700 }
6701 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6702 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6703 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6704 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6705 DataType::Interval => {
6706 Err(StorageError::Corrupt(
6711 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6712 ))
6713 }
6714 DataType::Json => Ok(Value::Json(self.read_str()?)),
6715 DataType::Bytes => {
6718 let len = self.read_u16()? as usize;
6719 let bytes = self.take(len)?.to_vec();
6720 Ok(Value::Bytes(bytes))
6721 }
6722 DataType::TextArray => {
6724 let count = self.read_u16()? as usize;
6725 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6726 for _ in 0..count {
6727 match self.read_u8()? {
6728 0 => items.push(Some(self.read_str()?)),
6729 1 => items.push(None),
6730 other => {
6731 return Err(StorageError::Corrupt(format!(
6732 "TEXT[] null flag: unknown byte {other}"
6733 )));
6734 }
6735 }
6736 }
6737 Ok(Value::TextArray(items))
6738 }
6739 DataType::IntArray => {
6741 let count = self.read_u16()? as usize;
6742 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6743 for _ in 0..count {
6744 match self.read_u8()? {
6745 0 => items.push(Some(self.read_i32()?)),
6746 1 => items.push(None),
6747 other => {
6748 return Err(StorageError::Corrupt(format!(
6749 "INT[] null flag: unknown byte {other}"
6750 )));
6751 }
6752 }
6753 }
6754 Ok(Value::IntArray(items))
6755 }
6756 DataType::BigIntArray => {
6758 let count = self.read_u16()? as usize;
6759 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6760 for _ in 0..count {
6761 match self.read_u8()? {
6762 0 => items.push(Some(self.read_i64()?)),
6763 1 => items.push(None),
6764 other => {
6765 return Err(StorageError::Corrupt(format!(
6766 "BIGINT[] null flag: unknown byte {other}"
6767 )));
6768 }
6769 }
6770 }
6771 Ok(Value::BigIntArray(items))
6772 }
6773 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6777 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6778 }
6779 }
6780
6781 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6783 let count = self.read_u16()? as usize;
6784 let mut out = Vec::with_capacity(count);
6785 for _ in 0..count {
6786 let word = self.read_str()?;
6787 let pos_count = self.read_u16()? as usize;
6788 let mut positions = Vec::with_capacity(pos_count);
6789 for _ in 0..pos_count {
6790 positions.push(self.read_u16()?);
6791 }
6792 let weight = self.read_u8()?;
6793 out.push(TsLexeme {
6794 word,
6795 positions,
6796 weight,
6797 });
6798 }
6799 Ok(out)
6800 }
6801
6802 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6804 let tag = self.read_u8()?;
6805 match tag {
6806 0 => {
6807 let word = self.read_str()?;
6808 let weight_mask = self.read_u8()?;
6809 Ok(TsQueryAst::Term { word, weight_mask })
6810 }
6811 1 => {
6812 let a = self.read_tsquery_body()?;
6813 let b = self.read_tsquery_body()?;
6814 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6815 }
6816 2 => {
6817 let a = self.read_tsquery_body()?;
6818 let b = self.read_tsquery_body()?;
6819 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6820 }
6821 3 => {
6822 let x = self.read_tsquery_body()?;
6823 Ok(TsQueryAst::Not(Box::new(x)))
6824 }
6825 4 => {
6826 let distance = self.read_u16()?;
6827 let left = self.read_tsquery_body()?;
6828 let right = self.read_tsquery_body()?;
6829 Ok(TsQueryAst::Phrase {
6830 left: Box::new(left),
6831 right: Box::new(right),
6832 distance,
6833 })
6834 }
6835 other => Err(StorageError::Corrupt(format!(
6836 "tsquery: unknown node tag {other}"
6837 ))),
6838 }
6839 }
6840
6841 fn read_value(&mut self) -> Result<Value, StorageError> {
6842 let tag = self.read_u8()?;
6843 match tag {
6844 0 => Ok(Value::Null),
6845 1 => Ok(Value::Int(self.read_i32()?)),
6846 2 => Ok(Value::BigInt(self.read_i64()?)),
6847 3 => Ok(Value::Float(self.read_f64()?)),
6848 4 => Ok(Value::Text(self.read_str()?)),
6849 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6850 6 => {
6851 let dim = self.read_u32()? as usize;
6852 let mut v = Vec::with_capacity(dim);
6853 for _ in 0..dim {
6854 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6855 v.push(f32::from_le_bytes(bytes));
6856 }
6857 Ok(Value::Vector(v))
6858 }
6859 7 => {
6860 let s = self.take(2)?;
6861 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6862 }
6863 8 => {
6864 let s = self.take(16)?;
6865 let arr: [u8; 16] = s.try_into().expect("checked");
6866 let scaled = i128::from_le_bytes(arr);
6867 let scale = self.read_u8()?;
6868 Ok(Value::Numeric { scaled, scale })
6869 }
6870 9 => Ok(Value::Date(self.read_i32()?)),
6871 10 => Ok(Value::Timestamp(self.read_i64()?)),
6872 11 => {
6877 let dim = self.read_u32()? as usize;
6878 let min = self.read_f32()?;
6879 let max = self.read_f32()?;
6880 let bytes = self.take(dim)?.to_vec();
6881 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6882 }
6883 12 => {
6886 let dim = self.read_u32()? as usize;
6887 let bytes = self.take(dim * 2)?.to_vec();
6888 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6889 }
6890 14 => {
6892 let len = self.read_u16()? as usize;
6893 let bytes = self.take(len)?.to_vec();
6894 Ok(Value::Bytes(bytes))
6895 }
6896 15 => {
6899 let count = self.read_u16()? as usize;
6900 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6901 for _ in 0..count {
6902 match self.read_u8()? {
6903 0 => items.push(Some(self.read_str()?)),
6904 1 => items.push(None),
6905 other => {
6906 return Err(StorageError::Corrupt(format!(
6907 "TEXT[] null flag in value tag: unknown byte {other}"
6908 )));
6909 }
6910 }
6911 }
6912 Ok(Value::TextArray(items))
6913 }
6914 16 => {
6916 let count = self.read_u16()? as usize;
6917 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6918 for _ in 0..count {
6919 match self.read_u8()? {
6920 0 => items.push(Some(self.read_i32()?)),
6921 1 => items.push(None),
6922 other => {
6923 return Err(StorageError::Corrupt(format!(
6924 "INT[] null flag in value tag: unknown byte {other}"
6925 )));
6926 }
6927 }
6928 }
6929 Ok(Value::IntArray(items))
6930 }
6931 17 => {
6932 let count = self.read_u16()? as usize;
6933 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6934 for _ in 0..count {
6935 match self.read_u8()? {
6936 0 => items.push(Some(self.read_i64()?)),
6937 1 => items.push(None),
6938 other => {
6939 return Err(StorageError::Corrupt(format!(
6940 "BIGINT[] null flag in value tag: unknown byte {other}"
6941 )));
6942 }
6943 }
6944 }
6945 Ok(Value::BigIntArray(items))
6946 }
6947 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6950 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6952 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6953 }
6954 }
6955
6956 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6960 let m_max_0 = self.read_u16()? as usize;
6961 let entry_raw = self.read_u32()?;
6962 let entry = if entry_raw == u32::MAX {
6963 None
6964 } else {
6965 Some(entry_raw as usize)
6966 };
6967 let entry_level = self.read_u8()?;
6968 let node_count = self.read_u32()? as usize;
6969 let mut levels: PersistentVec<u8> = PersistentVec::new();
6974 for _ in 0..node_count {
6975 levels.push_mut(self.read_u8()?);
6976 }
6977 let layer_count = self.read_u8()? as usize;
6978 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6979 for _ in 0..layer_count {
6980 let n = self.read_u32()? as usize;
6981 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6982 for _ in 0..n {
6983 let cnt = self.read_u16()? as usize;
6984 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6985 for _ in 0..cnt {
6986 row.push(self.read_u32()?);
6987 }
6988 per_layer.push_mut(row);
6989 }
6990 layers.push(per_layer);
6991 }
6992 Ok(NswGraph {
6993 m,
6994 m_max_0,
6995 entry,
6996 entry_level,
6997 levels,
6998 layers,
6999 })
7000 }
7001}
7002
7003#[cfg(test)]
7004mod tests {
7005 use super::*;
7006 use alloc::string::ToString;
7007 use alloc::vec;
7008
7009 #[cfg(target_arch = "aarch64")]
7010 #[test]
7011 fn neon_l2_matches_scalar() {
7012 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
7017 for &d in &dims {
7018 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
7019 let mut a = Vec::with_capacity(d);
7020 let mut b = Vec::with_capacity(d);
7021 for _ in 0..d {
7022 state = state
7023 .wrapping_mul(6_364_136_223_846_793_005)
7024 .wrapping_add(1);
7025 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7026 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7027 state = state
7028 .wrapping_mul(6_364_136_223_846_793_005)
7029 .wrapping_add(1);
7030 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7031 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7032 a.push(x);
7033 b.push(y);
7034 }
7035 let scalar = l2_distance_sq_scalar(&a, &b);
7036 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
7037 let tol = (scalar.abs().max(1e-6)) * 1e-4;
7038 assert!(
7039 (scalar - neon).abs() <= tol,
7040 "dim={d}: scalar={scalar} neon={neon} diff={}",
7041 (scalar - neon).abs()
7042 );
7043 }
7044 }
7045
7046 #[cfg(target_arch = "aarch64")]
7047 #[test]
7048 fn neon_inner_product_matches_scalar() {
7049 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
7053 for &d in &dims {
7054 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
7055 let mut a = Vec::with_capacity(d);
7056 let mut b = Vec::with_capacity(d);
7057 for _ in 0..d {
7058 state = state
7059 .wrapping_mul(6_364_136_223_846_793_005)
7060 .wrapping_add(1);
7061 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7062 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7063 state = state
7064 .wrapping_mul(6_364_136_223_846_793_005)
7065 .wrapping_add(1);
7066 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7067 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7068 a.push(x);
7069 b.push(y);
7070 }
7071 let scalar = inner_product_scalar(&a, &b);
7072 let neon = unsafe { inner_product_neon(&a, &b) };
7073 #[allow(clippy::cast_precision_loss)]
7074 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
7075 assert!(
7076 (scalar - neon).abs() <= tol,
7077 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
7078 (scalar - neon).abs()
7079 );
7080 }
7081 }
7082
7083 #[cfg(target_arch = "aarch64")]
7084 #[allow(clippy::similar_names)]
7085 #[test]
7086 fn neon_cosine_dot_norms_matches_scalar() {
7087 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
7088 for &d in &dims {
7089 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
7090 let mut a = Vec::with_capacity(d);
7091 let mut b = Vec::with_capacity(d);
7092 for _ in 0..d {
7093 state = state
7094 .wrapping_mul(6_364_136_223_846_793_005)
7095 .wrapping_add(1);
7096 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7097 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7098 state = state
7099 .wrapping_mul(6_364_136_223_846_793_005)
7100 .wrapping_add(1);
7101 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
7102 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
7103 a.push(x);
7104 b.push(y);
7105 }
7106 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
7107 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
7108 #[allow(clippy::cast_precision_loss)]
7109 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
7110 #[allow(clippy::cast_precision_loss)]
7111 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
7112 assert!(
7113 (dot_s - dot_n).abs() <= tol_d,
7114 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
7115 );
7116 assert!(
7117 (na_s - na_n).abs() <= tol_n,
7118 "cosine na dim={d}: scalar={na_s} neon={na_n}"
7119 );
7120 assert!(
7121 (nb_s - nb_n).abs() <= tol_n,
7122 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
7123 );
7124 }
7125 }
7126
7127 fn make_users_schema() -> TableSchema {
7128 TableSchema::new(
7129 "users",
7130 vec![
7131 ColumnSchema::new("id", DataType::Int, false),
7132 ColumnSchema::new("name", DataType::Text, false),
7133 ColumnSchema::new("score", DataType::Float, true),
7134 ],
7135 )
7136 }
7137
7138 #[test]
7139 fn value_type_tag_matches_variant() {
7140 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
7141 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
7142 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
7143 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
7144 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
7145 assert_eq!(Value::Null.data_type(), None);
7146 assert!(Value::Null.is_null());
7147 assert!(!Value::Int(0).is_null());
7148 }
7149
7150 #[test]
7151 fn sq8_value_reports_sq8_data_type() {
7152 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
7157 let v = Value::Sq8Vector(q);
7158 assert_eq!(
7159 v.data_type(),
7160 Some(DataType::Vector {
7161 dim: 5,
7162 encoding: VecEncoding::Sq8,
7163 }),
7164 );
7165 }
7166
7167 #[test]
7168 fn datatype_display_matches_pg_keyword() {
7169 assert_eq!(DataType::Int.to_string(), "INT");
7170 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
7171 assert_eq!(DataType::Float.to_string(), "FLOAT");
7172 assert_eq!(DataType::Text.to_string(), "TEXT");
7173 assert_eq!(DataType::Bool.to_string(), "BOOL");
7174 }
7175
7176 #[test]
7177 fn row_len_and_emptiness() {
7178 let r = Row::new(vec![Value::Int(1), Value::Null]);
7179 assert_eq!(r.len(), 2);
7180 assert!(!r.is_empty());
7181 assert!(Row::new(Vec::new()).is_empty());
7182 }
7183
7184 #[test]
7185 fn table_schema_column_position() {
7186 let s = make_users_schema();
7187 assert_eq!(s.column_position("id"), Some(0));
7188 assert_eq!(s.column_position("score"), Some(2));
7189 assert_eq!(s.column_position("missing"), None);
7190 }
7191
7192 #[test]
7193 fn catalog_create_table_then_lookup() {
7194 let mut cat = Catalog::new();
7195 cat.create_table(make_users_schema()).unwrap();
7196 assert_eq!(cat.table_count(), 1);
7197 assert!(cat.get("users").is_some());
7198 assert!(cat.get("nope").is_none());
7199 }
7200
7201 #[test]
7202 fn catalog_duplicate_table_is_rejected() {
7203 let mut cat = Catalog::new();
7204 cat.create_table(make_users_schema()).unwrap();
7205 let err = cat.create_table(make_users_schema()).unwrap_err();
7206 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
7207 }
7208
7209 #[test]
7210 fn table_insert_happy_path_appends_row() {
7211 let mut cat = Catalog::new();
7212 cat.create_table(make_users_schema()).unwrap();
7213 let t = cat.get_mut("users").unwrap();
7214 t.insert(Row::new(vec![
7215 Value::Int(1),
7216 Value::Text("alice".into()),
7217 Value::Float(99.5),
7218 ]))
7219 .unwrap();
7220 assert_eq!(t.row_count(), 1);
7221 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
7222 }
7223
7224 #[test]
7225 fn table_insert_arity_mismatch() {
7226 let mut cat = Catalog::new();
7227 cat.create_table(make_users_schema()).unwrap();
7228 let t = cat.get_mut("users").unwrap();
7229 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
7230 assert!(matches!(
7231 err,
7232 StorageError::ArityMismatch {
7233 expected: 3,
7234 actual: 1
7235 }
7236 ));
7237 assert_eq!(t.row_count(), 0);
7238 }
7239
7240 #[test]
7241 fn table_insert_type_mismatch_reports_column() {
7242 let mut cat = Catalog::new();
7243 cat.create_table(make_users_schema()).unwrap();
7244 let t = cat.get_mut("users").unwrap();
7245 let err = t
7246 .insert(Row::new(vec![
7247 Value::Int(1),
7248 Value::Int(42), Value::Float(0.0),
7250 ]))
7251 .unwrap_err();
7252 match err {
7253 StorageError::TypeMismatch {
7254 ref column,
7255 expected,
7256 actual,
7257 position,
7258 } => {
7259 assert_eq!(column, "name");
7260 assert_eq!(expected, DataType::Text);
7261 assert_eq!(actual, DataType::Int);
7262 assert_eq!(position, 1);
7263 }
7264 other => panic!("unexpected: {other:?}"),
7265 }
7266 assert_eq!(t.row_count(), 0);
7267 }
7268
7269 #[test]
7270 fn table_insert_null_into_not_null_rejected() {
7271 let mut cat = Catalog::new();
7272 cat.create_table(make_users_schema()).unwrap();
7273 let t = cat.get_mut("users").unwrap();
7274 let err = t
7275 .insert(Row::new(vec![
7276 Value::Int(1),
7277 Value::Null, Value::Float(1.0),
7279 ]))
7280 .unwrap_err();
7281 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
7282 }
7283
7284 #[test]
7285 fn table_insert_null_into_nullable_ok() {
7286 let mut cat = Catalog::new();
7287 cat.create_table(make_users_schema()).unwrap();
7288 let t = cat.get_mut("users").unwrap();
7289 t.insert(Row::new(vec![
7290 Value::Int(1),
7291 Value::Text("bob".into()),
7292 Value::Null,
7293 ]))
7294 .unwrap();
7295 assert_eq!(t.row_count(), 1);
7296 }
7297
7298 #[test]
7299 fn catalog_get_mut_independent_per_table() {
7300 let mut cat = Catalog::new();
7301 cat.create_table(TableSchema::new(
7302 "a",
7303 vec![ColumnSchema::new("v", DataType::Int, false)],
7304 ))
7305 .unwrap();
7306 cat.create_table(TableSchema::new(
7307 "b",
7308 vec![ColumnSchema::new("v", DataType::Int, false)],
7309 ))
7310 .unwrap();
7311 cat.get_mut("a")
7312 .unwrap()
7313 .insert(Row::new(vec![Value::Int(1)]))
7314 .unwrap();
7315 assert_eq!(cat.get("a").unwrap().row_count(), 1);
7316 assert_eq!(cat.get("b").unwrap().row_count(), 0);
7317 }
7318
7319 fn assert_round_trip(cat: &Catalog) {
7322 let bytes = cat.serialize();
7323 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7324 assert_eq!(restored.table_count(), cat.table_count());
7327 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
7328 assert_eq!(a.schema, b.schema);
7329 assert_eq!(a.rows, b.rows);
7330 }
7331 }
7332
7333 #[test]
7334 fn serialize_empty_catalog_round_trips() {
7335 assert_round_trip(&Catalog::new());
7336 }
7337
7338 #[test]
7339 fn serialize_single_empty_table_round_trips() {
7340 let mut cat = Catalog::new();
7341 cat.create_table(make_users_schema()).unwrap();
7342 assert_round_trip(&cat);
7343 }
7344
7345 #[test]
7346 fn nsw_clone_is_o1() {
7347 let mut cat = Catalog::new();
7356 cat.create_table(TableSchema::new(
7357 "docs",
7358 alloc::vec![
7359 ColumnSchema::new("id", DataType::Int, false),
7360 ColumnSchema::new(
7361 "v",
7362 DataType::Vector {
7363 dim: 3,
7364 encoding: VecEncoding::F32
7365 },
7366 true
7367 ),
7368 ],
7369 ))
7370 .unwrap();
7371 let t = cat.get_mut("docs").unwrap();
7372 for i in 0..1500_i32 {
7373 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
7375 t.insert(Row::new(alloc::vec![
7376 Value::Int(i),
7377 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7378 ]))
7379 .unwrap();
7380 }
7381 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7382 .unwrap();
7383 let g = match &cat.get("docs").unwrap().indices()[0].kind {
7384 IndexKind::Nsw(g) => g,
7385 IndexKind::BTree(_)
7386 | IndexKind::Brin { .. }
7387 | IndexKind::Gin(_)
7388 | IndexKind::GinTrgm(_) => {
7389 panic!("expected NSW")
7390 }
7391 };
7392 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
7395 assert!(
7396 g.layers.len() >= 2,
7397 "1500 nodes should populate at least two HNSW layers, got {}",
7398 g.layers.len()
7399 );
7400
7401 let cloned = g.clone();
7402
7403 assert!(
7404 g.levels.shares_storage_with(&cloned.levels),
7405 "levels PV not shared after clone — clone copied elements (O(N))"
7406 );
7407 assert_eq!(g.layers.len(), cloned.layers.len());
7408 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
7409 assert!(
7410 orig.shares_storage_with(cl),
7411 "layer {l} PV not shared after clone — clone copied elements (O(N))"
7412 );
7413 }
7414 }
7415
7416 #[test]
7417 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
7418 let mut cat = Catalog::new();
7425 cat.create_table(TableSchema::new(
7426 "vecs",
7427 alloc::vec![
7428 ColumnSchema::new("id", DataType::Int, false),
7429 ColumnSchema::new(
7430 "v",
7431 DataType::Vector {
7432 dim: 8,
7433 encoding: VecEncoding::Sq8,
7434 },
7435 false,
7436 ),
7437 ],
7438 ))
7439 .unwrap();
7440 let t = cat.get_mut("vecs").unwrap();
7441 for i in 0..32_i32 {
7442 #[allow(clippy::cast_precision_loss)]
7443 let base = (i as f32) * 0.03;
7444 let v: Vec<f32> = (0..8_i32)
7445 .map(|j| {
7446 #[allow(clippy::cast_precision_loss)]
7447 let off = (j as f32) * 0.01;
7448 base + off
7449 })
7450 .collect();
7451 t.insert(Row::new(alloc::vec![
7452 Value::Int(i),
7453 Value::Sq8Vector(quantize::quantize(&v)),
7454 ]))
7455 .unwrap();
7456 }
7457 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7458 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7461 let (before_cell, before_ty, before_hits) = {
7462 let t_ref = cat.get("vecs").unwrap();
7463 (
7464 t_ref.rows()[5].values[1].clone(),
7465 t_ref.schema().columns[1].ty,
7466 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7467 )
7468 };
7469
7470 let bytes = cat.serialize();
7471 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7472 let rt = restored.get("vecs").unwrap();
7473 assert_eq!(rt.schema().columns[1].ty, before_ty);
7474 assert_eq!(rt.rows()[5].values[1], before_cell);
7475 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7476 assert_eq!(before_hits, after_hits);
7477 }
7478
7479 #[test]
7480 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7481 use crate::halfvec;
7488 let mut cat = Catalog::new();
7489 cat.create_table(TableSchema::new(
7490 "vecs",
7491 alloc::vec![
7492 ColumnSchema::new("id", DataType::Int, false),
7493 ColumnSchema::new(
7494 "v",
7495 DataType::Vector {
7496 dim: 8,
7497 encoding: VecEncoding::F16,
7498 },
7499 false,
7500 ),
7501 ],
7502 ))
7503 .unwrap();
7504 let t = cat.get_mut("vecs").unwrap();
7505 for i in 0..32_i32 {
7506 #[allow(clippy::cast_precision_loss)]
7507 let base = (i as f32) * 0.03;
7508 let v: Vec<f32> = (0..8_i32)
7509 .map(|j| {
7510 #[allow(clippy::cast_precision_loss)]
7511 let off = (j as f32) * 0.01;
7512 base + off
7513 })
7514 .collect();
7515 t.insert(Row::new(alloc::vec![
7516 Value::Int(i),
7517 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7518 ]))
7519 .unwrap();
7520 }
7521 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7522 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7523 let (before_cell, before_ty, before_hits) = {
7524 let t_ref = cat.get("vecs").unwrap();
7525 (
7526 t_ref.rows()[5].values[1].clone(),
7527 t_ref.schema().columns[1].ty,
7528 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7529 )
7530 };
7531 let bytes = cat.serialize();
7532 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7533 let rt = restored.get("vecs").unwrap();
7534 assert_eq!(rt.schema().columns[1].ty, before_ty);
7535 assert_eq!(rt.rows()[5].values[1], before_cell);
7536 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7537 assert_eq!(before_hits, after_hits);
7538 }
7539
7540 #[test]
7541 #[allow(clippy::similar_names)]
7542 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7543 use crate::halfvec;
7550 fn next(state: &mut u64) -> f32 {
7551 *state = state
7552 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7553 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7554 #[allow(clippy::cast_precision_loss)]
7555 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7556 2.0 * u - 1.0
7557 }
7558 let dim: u32 = 32;
7559 let n: usize = 512;
7560 let dim_us = dim as usize;
7561 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7562 let corpus: Vec<Vec<f32>> = (0..n)
7563 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7564 .collect();
7565 let queries: Vec<Vec<f32>> = (0..32)
7566 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7567 .collect();
7568 let exact_top10: Vec<Vec<usize>> = queries
7569 .iter()
7570 .map(|q| {
7571 let mut scored: Vec<(f32, usize)> = corpus
7572 .iter()
7573 .enumerate()
7574 .map(|(i, v)| (l2_distance_sq(v, q), i))
7575 .collect();
7576 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7577 scored.into_iter().take(10).map(|(_, i)| i).collect()
7578 })
7579 .collect();
7580 let mut cat = Catalog::new();
7581 cat.create_table(TableSchema::new(
7582 "vecs",
7583 alloc::vec![
7584 ColumnSchema::new("id", DataType::Int, false),
7585 ColumnSchema::new(
7586 "v",
7587 DataType::Vector {
7588 dim,
7589 encoding: VecEncoding::F16,
7590 },
7591 false,
7592 ),
7593 ],
7594 ))
7595 .unwrap();
7596 let t = cat.get_mut("vecs").unwrap();
7597 for (i, v) in corpus.iter().enumerate() {
7598 t.insert(Row::new(alloc::vec![
7599 Value::Int(i32::try_from(i).unwrap()),
7600 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7601 ]))
7602 .unwrap();
7603 }
7604 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7605 let table = cat.get("vecs").unwrap();
7606 let mut total_overlap = 0_usize;
7607 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7608 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7609 for h in &hits {
7610 if exact.contains(h) {
7611 total_overlap += 1;
7612 }
7613 }
7614 }
7615 #[allow(clippy::cast_precision_loss)]
7616 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7617 assert!(
7618 recall >= 0.95,
7619 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7620 check halfvec dispatch in `cell_to_query_metric_distance`"
7621 );
7622 }
7623
7624 #[test]
7625 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7626 use crate::quantize;
7633 fn next(state: &mut u64) -> f32 {
7637 *state = state
7638 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7639 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7640 #[allow(clippy::cast_precision_loss)]
7641 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7642 2.0 * u - 1.0
7643 }
7644 let dim: u32 = 32;
7645 let n: usize = 512;
7646 let dim_us = dim as usize;
7647 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7648 let corpus: Vec<Vec<f32>> = (0..n)
7649 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7650 .collect();
7651 let queries: Vec<Vec<f32>> = (0..32)
7652 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7653 .collect();
7654 let exact_top10: Vec<Vec<usize>> = queries
7656 .iter()
7657 .map(|q| {
7658 let mut scored: Vec<(f32, usize)> = corpus
7659 .iter()
7660 .enumerate()
7661 .map(|(i, v)| (l2_distance_sq(v, q), i))
7662 .collect();
7663 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7664 scored.into_iter().take(10).map(|(_, i)| i).collect()
7665 })
7666 .collect();
7667 let mut cat = Catalog::new();
7670 cat.create_table(TableSchema::new(
7671 "vecs",
7672 alloc::vec![
7673 ColumnSchema::new("id", DataType::Int, false),
7674 ColumnSchema::new(
7675 "v",
7676 DataType::Vector {
7677 dim,
7678 encoding: VecEncoding::Sq8,
7679 },
7680 false,
7681 ),
7682 ],
7683 ))
7684 .unwrap();
7685 let t = cat.get_mut("vecs").unwrap();
7686 for (i, v) in corpus.iter().enumerate() {
7687 t.insert(Row::new(alloc::vec![
7688 Value::Int(i32::try_from(i).unwrap()),
7689 Value::Sq8Vector(quantize::quantize(v)),
7690 ]))
7691 .unwrap();
7692 }
7693 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7694 let table = cat.get("vecs").unwrap();
7695 let mut total_overlap = 0_usize;
7696 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7697 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7698 for h in &hits {
7699 if exact.contains(h) {
7700 total_overlap += 1;
7701 }
7702 }
7703 }
7704 #[allow(clippy::cast_precision_loss)]
7705 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7706 assert!(
7707 recall >= 0.95,
7708 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7709 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7710 );
7711 }
7712
7713 #[test]
7714 fn nsw_index_topology_persists_through_round_trip() {
7715 let mut cat = Catalog::new();
7721 cat.create_table(TableSchema::new(
7722 "docs",
7723 alloc::vec![
7724 ColumnSchema::new("id", DataType::Int, false),
7725 ColumnSchema::new(
7726 "v",
7727 DataType::Vector {
7728 dim: 3,
7729 encoding: VecEncoding::F32
7730 },
7731 true
7732 ),
7733 ],
7734 ))
7735 .unwrap();
7736 let t = cat.get_mut("docs").unwrap();
7737 for i in 0..6_i32 {
7738 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7740 let row = Row::new(alloc::vec![
7741 Value::Int(i),
7742 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7743 ]);
7744 t.insert(row).unwrap();
7745 }
7746 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7747 .unwrap();
7748 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7749 IndexKind::Nsw(g) => g.clone(),
7750 IndexKind::BTree(_)
7751 | IndexKind::Brin { .. }
7752 | IndexKind::Gin(_)
7753 | IndexKind::GinTrgm(_) => {
7754 panic!("expected NSW")
7755 }
7756 };
7757 let bytes = cat.serialize();
7758 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7759 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7760 IndexKind::Nsw(g) => g.clone(),
7761 IndexKind::BTree(_)
7762 | IndexKind::Brin { .. }
7763 | IndexKind::Gin(_)
7764 | IndexKind::GinTrgm(_) => {
7765 panic!("expected NSW")
7766 }
7767 };
7768 assert_eq!(restored_graph.m, original.m);
7769 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7770 assert_eq!(restored_graph.entry, original.entry);
7771 assert_eq!(restored_graph.entry_level, original.entry_level);
7772 assert_eq!(restored_graph.levels, original.levels);
7773 assert_eq!(restored_graph.layers, original.layers);
7774 }
7775
7776 #[test]
7777 fn hnsw_level_assignment_is_deterministic() {
7778 for i in 0..32usize {
7781 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7782 }
7783 }
7784
7785 #[test]
7786 fn hnsw_layer_0_dominates_population() {
7787 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7792 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7793 }
7794
7795 #[test]
7796 fn hnsw_search_matches_brute_force_for_l2_top1() {
7797 let mut cat = Catalog::new();
7801 cat.create_table(TableSchema::new(
7802 "vecs",
7803 alloc::vec![
7804 ColumnSchema::new("id", DataType::Int, false),
7805 ColumnSchema::new(
7806 "v",
7807 DataType::Vector {
7808 dim: 3,
7809 encoding: VecEncoding::F32
7810 },
7811 true
7812 ),
7813 ],
7814 ))
7815 .unwrap();
7816 let t = cat.get_mut("vecs").unwrap();
7817 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7818 (1, [0.0, 0.0, 0.0]),
7819 (2, [1.0, 0.0, 0.0]),
7820 (3, [0.0, 1.0, 0.0]),
7821 (4, [0.0, 0.0, 1.0]),
7822 (5, [1.0, 1.0, 0.0]),
7823 (6, [1.0, 0.0, 1.0]),
7824 (7, [0.0, 1.0, 1.0]),
7825 (8, [1.0, 1.0, 1.0]),
7826 (9, [0.5, 0.5, 0.5]),
7827 (10, [0.2, 0.8, 0.5]),
7828 ];
7829 for &(id, v) in &dataset {
7830 t.insert(Row::new(alloc::vec![
7831 Value::Int(id),
7832 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7833 ]))
7834 .unwrap();
7835 }
7836 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7837 let idx_pos = cat
7838 .get("vecs")
7839 .unwrap()
7840 .indices()
7841 .iter()
7842 .position(|i| i.name == "v_idx")
7843 .unwrap();
7844 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7845 let table = cat.get("vecs").unwrap();
7846 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7847 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7848 .map(|i| {
7849 let Value::Vector(v) = &table.rows[i].values[1] else {
7850 return (f32::INFINITY, i);
7851 };
7852 (l2_distance_sq(v, &query), i)
7853 })
7854 .collect();
7855 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7856 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7857 assert_eq!(
7858 hnsw_top[0].1, brute[0].1,
7859 "HNSW top-1 != brute-force top-1 for {query:?}"
7860 );
7861 }
7862 }
7863
7864 #[test]
7865 fn serialize_table_with_rows_round_trips() {
7866 let mut cat = Catalog::new();
7867 cat.create_table(make_users_schema()).unwrap();
7868 let t = cat.get_mut("users").unwrap();
7869 t.insert(Row::new(vec![
7870 Value::Int(1),
7871 Value::Text("alice".into()),
7872 Value::Float(95.5),
7873 ]))
7874 .unwrap();
7875 t.insert(Row::new(vec![
7876 Value::Int(2),
7877 Value::Text("bob".into()),
7878 Value::Null,
7879 ]))
7880 .unwrap();
7881 assert_round_trip(&cat);
7882 }
7883
7884 #[test]
7885 fn serialize_multiple_tables_round_trips() {
7886 let mut cat = Catalog::new();
7887 cat.create_table(make_users_schema()).unwrap();
7888 cat.create_table(TableSchema::new(
7889 "flags",
7890 vec![
7891 ColumnSchema::new("id", DataType::BigInt, false),
7892 ColumnSchema::new("active", DataType::Bool, false),
7893 ],
7894 ))
7895 .unwrap();
7896 cat.get_mut("flags")
7897 .unwrap()
7898 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7899 .unwrap();
7900 assert_round_trip(&cat);
7901 }
7902
7903 #[test]
7904 fn deserialize_rejects_bad_magic() {
7905 let mut buf = b"BADMAGIC".to_vec();
7906 buf.push(FILE_VERSION);
7907 buf.extend_from_slice(&0u32.to_le_bytes());
7908 let err = Catalog::deserialize(&buf).unwrap_err();
7909 assert!(matches!(err, StorageError::Corrupt(_)));
7910 }
7911
7912 #[test]
7913 fn deserialize_rejects_unsupported_version() {
7914 let mut buf = FILE_MAGIC.to_vec();
7915 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7917 let err = Catalog::deserialize(&buf).unwrap_err();
7918 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7919 }
7920
7921 #[test]
7922 fn deserialize_rejects_truncated_file() {
7923 let mut cat = Catalog::new();
7924 cat.create_table(make_users_schema()).unwrap();
7925 let bytes = cat.serialize();
7926 let truncated = &bytes[..bytes.len() - 1];
7928 assert!(matches!(
7929 Catalog::deserialize(truncated),
7930 Err(StorageError::Corrupt(_))
7931 ));
7932 }
7933
7934 #[test]
7935 fn deserialize_rejects_trailing_garbage() {
7936 let cat = Catalog::new();
7937 let mut bytes = cat.serialize();
7938 bytes.push(0xFF);
7939 assert!(matches!(
7940 Catalog::deserialize(&bytes),
7941 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7942 ));
7943 }
7944
7945 fn populated_users() -> Catalog {
7948 let mut cat = Catalog::new();
7949 cat.create_table(make_users_schema()).unwrap();
7950 let t = cat.get_mut("users").unwrap();
7951 for (id, name, score) in [
7952 (1, "alice", Some(90.0)),
7953 (2, "bob", None),
7954 (3, "alice", Some(70.0)), ] {
7956 t.insert(Row::new(vec![
7957 Value::Int(id),
7958 Value::Text(name.into()),
7959 score.map_or(Value::Null, Value::Float),
7960 ]))
7961 .unwrap();
7962 }
7963 cat
7964 }
7965
7966 #[test]
7967 fn add_index_builds_from_existing_rows() {
7968 let mut cat = populated_users();
7969 cat.get_mut("users")
7970 .unwrap()
7971 .add_index("by_id".into(), "id")
7972 .unwrap();
7973 let t = cat.get("users").unwrap();
7974 let idx = t.index_on(0).expect("index_on(0)");
7975 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7976 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7977 }
7978
7979 #[test]
7980 fn add_index_dup_name_rejected() {
7981 let mut cat = populated_users();
7982 let t = cat.get_mut("users").unwrap();
7983 t.add_index("ix".into(), "id").unwrap();
7984 let err = t.add_index("ix".into(), "name").unwrap_err();
7985 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7986 }
7987
7988 #[test]
7989 fn add_index_unknown_column_rejected() {
7990 let mut cat = populated_users();
7991 let err = cat
7992 .get_mut("users")
7993 .unwrap()
7994 .add_index("ix".into(), "ghost")
7995 .unwrap_err();
7996 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7997 }
7998
7999 #[test]
8000 fn insert_after_create_index_updates_it() {
8001 let mut cat = populated_users();
8002 let t = cat.get_mut("users").unwrap();
8003 t.add_index("by_name".into(), "name").unwrap();
8004 t.insert(Row::new(vec![
8005 Value::Int(4),
8006 Value::Text("dave".into()),
8007 Value::Null,
8008 ]))
8009 .unwrap();
8010 let idx = t.index_on(1).unwrap();
8011 assert_eq!(
8012 idx.lookup_eq(&IndexKey::Text("dave".into())),
8013 &[RowLocator::Hot(3)]
8014 );
8015 assert_eq!(
8017 idx.lookup_eq(&IndexKey::Text("alice".into())),
8018 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8019 );
8020 }
8021
8022 #[test]
8023 fn null_or_float_values_are_not_indexed() {
8024 let mut cat = populated_users();
8025 let t = cat.get_mut("users").unwrap();
8026 t.add_index("by_score".into(), "score").unwrap();
8027 let idx = t.index_on(2).unwrap();
8028 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
8033 }
8034
8035 #[test]
8038 fn vector_value_data_type_carries_dim() {
8039 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
8040 assert_eq!(
8041 v.data_type(),
8042 Some(DataType::Vector {
8043 dim: 3,
8044 encoding: VecEncoding::F32
8045 })
8046 );
8047 }
8048
8049 #[test]
8050 fn vector_column_insert_matching_dim_ok() {
8051 let mut cat = Catalog::new();
8052 cat.create_table(TableSchema::new(
8053 "emb",
8054 vec![ColumnSchema::new(
8055 "v",
8056 DataType::Vector {
8057 dim: 3,
8058 encoding: VecEncoding::F32,
8059 },
8060 false,
8061 )],
8062 ))
8063 .unwrap();
8064 cat.get_mut("emb")
8065 .unwrap()
8066 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
8067 .unwrap();
8068 }
8069
8070 #[test]
8071 fn vector_column_insert_dim_mismatch_rejected() {
8072 let mut cat = Catalog::new();
8073 cat.create_table(TableSchema::new(
8074 "emb",
8075 vec![ColumnSchema::new(
8076 "v",
8077 DataType::Vector {
8078 dim: 3,
8079 encoding: VecEncoding::F32,
8080 },
8081 false,
8082 )],
8083 ))
8084 .unwrap();
8085 let err = cat
8086 .get_mut("emb")
8087 .unwrap()
8088 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
8089 .unwrap_err();
8090 assert!(matches!(err, StorageError::TypeMismatch { .. }));
8091 }
8092
8093 #[test]
8094 fn vector_value_survives_catalog_round_trip() {
8095 let mut cat = Catalog::new();
8096 cat.create_table(TableSchema::new(
8097 "emb",
8098 vec![
8099 ColumnSchema::new("id", DataType::Int, false),
8100 ColumnSchema::new(
8101 "v",
8102 DataType::Vector {
8103 dim: 4,
8104 encoding: VecEncoding::F32,
8105 },
8106 false,
8107 ),
8108 ],
8109 ))
8110 .unwrap();
8111 cat.get_mut("emb")
8112 .unwrap()
8113 .insert(Row::new(vec![
8114 Value::Int(1),
8115 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
8116 ]))
8117 .unwrap();
8118 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
8119 let table = restored.get("emb").unwrap();
8120 assert_eq!(
8121 table.schema().columns[1].ty,
8122 DataType::Vector {
8123 dim: 4,
8124 encoding: VecEncoding::F32
8125 }
8126 );
8127 assert_eq!(
8128 table.rows()[0].values[1],
8129 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
8130 );
8131 }
8132
8133 #[test]
8134 fn index_survives_serialize_deserialize_round_trip() {
8135 let mut cat = populated_users();
8136 cat.get_mut("users")
8137 .unwrap()
8138 .add_index("by_name".into(), "name")
8139 .unwrap();
8140 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8141 let idx = restored
8142 .get("users")
8143 .unwrap()
8144 .index_on(1)
8145 .expect("index_on(1) after restore");
8146 assert_eq!(idx.name, "by_name");
8147 assert_eq!(
8149 idx.lookup_eq(&IndexKey::Text("alice".into())),
8150 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8151 );
8152 }
8153
8154 fn bigint_pk_users_schema() -> TableSchema {
8159 TableSchema::new(
8160 "users",
8161 vec![
8162 ColumnSchema::new("id", DataType::BigInt, false),
8163 ColumnSchema::new("name", DataType::Text, false),
8164 ],
8165 )
8166 }
8167
8168 fn make_user_row(id: i64, name: &str) -> Row {
8169 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
8170 }
8171
8172 #[test]
8173 fn lookup_by_pk_finds_row_via_hot_index() {
8174 let mut cat = Catalog::new();
8175 cat.create_table(bigint_pk_users_schema()).unwrap();
8176 let t = cat.get_mut("users").unwrap();
8177 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8178 t.insert(make_user_row(id, name)).unwrap();
8179 }
8180 t.add_index("by_id".into(), "id").unwrap();
8181 let got = cat
8183 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8184 .unwrap();
8185 assert_eq!(got, make_user_row(2, "bob"));
8186 assert_eq!(cat.cold_segment_count(), 0);
8187 }
8188
8189 #[test]
8190 fn lookup_by_pk_returns_none_when_key_missing() {
8191 let mut cat = Catalog::new();
8192 cat.create_table(bigint_pk_users_schema()).unwrap();
8193 let t = cat.get_mut("users").unwrap();
8194 t.insert(make_user_row(1, "alice")).unwrap();
8195 t.add_index("by_id".into(), "id").unwrap();
8196 assert!(
8197 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8198 .is_none()
8199 );
8200 assert!(
8202 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
8203 .is_none()
8204 );
8205 assert!(
8206 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
8207 .is_none()
8208 );
8209 }
8210
8211 #[test]
8212 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
8213 let mut cat = Catalog::new();
8217 cat.create_table(bigint_pk_users_schema()).unwrap();
8218 let t = cat.get_mut("users").unwrap();
8219 t.add_index("by_id".into(), "id").unwrap();
8220 let schema = t.schema.clone();
8221
8222 let cold_rows: Vec<(i64, &str)> =
8223 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
8224 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8225 .iter()
8226 .map(|(id, name)| {
8227 let row = make_user_row(*id, name);
8228 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8229 })
8230 .collect();
8231 let (seg_bytes, _meta) =
8232 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8233 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8234 assert_eq!(seg_id, 0);
8235 assert_eq!(cat.cold_segment_count(), 1);
8236
8237 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8238 .iter()
8239 .map(|(id, _)| {
8240 (
8241 IndexKey::Int(*id),
8242 RowLocator::Cold {
8243 segment_id: seg_id,
8244 page_offset: 0,
8245 },
8246 )
8247 })
8248 .collect();
8249 let registered = cat
8250 .get_mut("users")
8251 .unwrap()
8252 .register_cold_locators("by_id", pairs)
8253 .unwrap();
8254 assert_eq!(registered, 4);
8255
8256 for (id, name) in &cold_rows {
8257 let got = cat
8258 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8259 .unwrap_or_else(|| panic!("cold key {id} not found"));
8260 assert_eq!(got, make_user_row(*id, name));
8261 }
8262 assert!(
8264 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8265 .is_none()
8266 );
8267 }
8268
8269 #[test]
8270 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
8271 let mut cat = Catalog::new();
8275 cat.create_table(bigint_pk_users_schema()).unwrap();
8276 let t = cat.get_mut("users").unwrap();
8277 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8278 t.insert(make_user_row(id, name)).unwrap();
8279 }
8280 t.add_index("by_id".into(), "id").unwrap();
8281 let schema = t.schema.clone();
8282
8283 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
8284 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8285 .iter()
8286 .map(|(id, name)| {
8287 let row = make_user_row(*id, name);
8288 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8289 })
8290 .collect();
8291 let (seg_bytes, _) =
8292 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8293 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8294 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8295 .iter()
8296 .map(|(id, _)| {
8297 (
8298 IndexKey::Int(*id),
8299 RowLocator::Cold {
8300 segment_id: seg_id,
8301 page_offset: 0,
8302 },
8303 )
8304 })
8305 .collect();
8306 cat.get_mut("users")
8307 .unwrap()
8308 .register_cold_locators("by_id", pairs)
8309 .unwrap();
8310
8311 assert_eq!(
8313 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8314 .unwrap(),
8315 make_user_row(1, "alice")
8316 );
8317 assert_eq!(
8318 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8319 .unwrap(),
8320 make_user_row(2, "bob")
8321 );
8322 assert_eq!(
8324 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
8325 .unwrap(),
8326 make_user_row(100, "ivy")
8327 );
8328 assert_eq!(
8329 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
8330 .unwrap(),
8331 make_user_row(200, "joe")
8332 );
8333 assert!(
8335 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
8336 .is_none()
8337 );
8338 }
8339
8340 #[test]
8341 fn register_cold_locators_rejects_nsw_index() {
8342 let mut cat = Catalog::new();
8343 cat.create_table(TableSchema::new(
8344 "vecs",
8345 vec![
8346 ColumnSchema::new("id", DataType::Int, false),
8347 ColumnSchema::new(
8348 "v",
8349 DataType::Vector {
8350 dim: 4,
8351 encoding: VecEncoding::F32,
8352 },
8353 false,
8354 ),
8355 ],
8356 ))
8357 .unwrap();
8358 let t = cat.get_mut("vecs").unwrap();
8359 t.insert(Row::new(vec![
8360 Value::Int(1),
8361 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
8362 ]))
8363 .unwrap();
8364 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
8365 let err = t
8366 .register_cold_locators(
8367 "by_v",
8368 vec![(
8369 IndexKey::Int(1),
8370 RowLocator::Cold {
8371 segment_id: 0,
8372 page_offset: 0,
8373 },
8374 )],
8375 )
8376 .unwrap_err();
8377 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
8380 }
8381
8382 #[test]
8383 fn load_segment_bytes_rejects_garbage() {
8384 let mut cat = Catalog::new();
8385 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
8386 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
8387 assert_eq!(cat.cold_segment_count(), 0);
8389 }
8390
8391 #[test]
8392 fn load_segment_bytes_returns_sequential_ids() {
8393 let mut cat = Catalog::new();
8394 cat.create_table(bigint_pk_users_schema()).unwrap();
8395 let schema = cat.get("users").unwrap().schema.clone();
8396 for batch in 0u32..3 {
8397 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
8398 .map(|i| {
8399 let id = u64::from(batch) * 100 + i;
8400 let row = make_user_row(id.cast_signed(), "x");
8401 (id, encode_row_body_dense(&row, &schema))
8402 })
8403 .collect();
8404 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8405 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
8406 }
8407 assert_eq!(cat.cold_segment_count(), 3);
8408 }
8409
8410 #[test]
8417 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
8418 let mut cat = populated_users();
8425 cat.get_mut("users")
8426 .unwrap()
8427 .add_index("by_name".into(), "name")
8428 .unwrap();
8429
8430 let v8_bytes = encode_as_v8(&cat);
8435 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
8436
8437 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
8438 let idx = restored
8439 .get("users")
8440 .unwrap()
8441 .index_on(1)
8442 .expect("index_on(1) after restore");
8443 assert_eq!(
8446 idx.lookup_eq(&IndexKey::Text("alice".into())),
8447 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8448 );
8449 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
8451 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
8452 }
8453 }
8454
8455 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
8460 let mut out = Vec::with_capacity(64);
8461 out.extend_from_slice(FILE_MAGIC);
8462 out.push(8u8);
8463 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
8464 for t in &cat.tables {
8465 write_str(&mut out, &t.schema.name);
8466 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
8467 for c in &t.schema.columns {
8468 write_str(&mut out, &c.name);
8469 write_data_type(&mut out, c.ty);
8470 out.push(u8::from(c.nullable));
8471 match &c.default {
8472 None => out.push(0),
8473 Some(v) => {
8474 out.push(1);
8475 write_value(&mut out, v);
8476 }
8477 }
8478 out.push(u8::from(c.auto_increment));
8479 }
8480 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
8481 for row in &t.rows {
8482 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8483 }
8484 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8485 for idx in &t.indices {
8486 write_str(&mut out, &idx.name);
8487 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8488 match &idx.kind {
8489 IndexKind::BTree(_) => out.push(0),
8492 IndexKind::Nsw(g) => {
8493 out.push(1);
8494 write_u16(&mut out, u16::try_from(g.m).unwrap());
8495 write_nsw_graph(&mut out, g);
8496 }
8497 IndexKind::Brin { .. } => panic!(
8500 "v8 catalog writer cannot serialise BRIN — \
8501 tests with BRIN indices must use the current writer"
8502 ),
8503 IndexKind::Gin(_) => panic!(
8504 "v8 catalog writer cannot serialise GIN — \
8505 tests with GIN indices must use the current writer"
8506 ),
8507 IndexKind::GinTrgm(_) => panic!(
8508 "v8 catalog writer cannot serialise trigram-GIN — \
8509 tests with trgm indices must use the current writer"
8510 ),
8511 }
8512 }
8513 }
8514 out
8515 }
8516
8517 #[test]
8523 fn v9_catalog_round_trip_preserves_cold_locators() {
8524 let mut cat = Catalog::new();
8525 cat.create_table(bigint_pk_users_schema()).unwrap();
8526 let t = cat.get_mut("users").unwrap();
8527 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8529 t.insert(make_user_row(id, name)).unwrap();
8530 }
8531 t.add_index("by_id".into(), "id").unwrap();
8532 let schema = t.schema.clone();
8533
8534 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8536 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8537 .iter()
8538 .map(|(id, name)| {
8539 let row = make_user_row(*id, name);
8540 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8541 })
8542 .collect();
8543 let (seg_bytes, _) =
8544 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8545 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8546 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8547 .iter()
8548 .map(|(id, _)| {
8549 (
8550 IndexKey::Int(*id),
8551 RowLocator::Cold {
8552 segment_id: seg_id,
8553 page_offset: 0,
8554 },
8555 )
8556 })
8557 .collect();
8558 cat.get_mut("users")
8559 .unwrap()
8560 .register_cold_locators("by_id", pairs)
8561 .unwrap();
8562
8563 let bytes = cat.serialize();
8565 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8566 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8567
8568 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8575 assert_eq!(restored_seg_id, seg_id);
8576
8577 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8578 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8580 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8581 for (id, _) in &cold_rows {
8583 assert_eq!(
8584 idx.lookup_eq(&IndexKey::Int(*id)),
8585 &[RowLocator::Cold {
8586 segment_id: seg_id,
8587 page_offset: 0,
8588 }]
8589 );
8590 }
8591 assert_eq!(
8593 restored
8594 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8595 .unwrap(),
8596 make_user_row(2, "bob")
8597 );
8598 for (id, name) in &cold_rows {
8599 assert_eq!(
8600 restored
8601 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8602 .unwrap(),
8603 make_user_row(*id, name)
8604 );
8605 }
8606 }
8607
8608 #[test]
8615 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8616 let schema = TableSchema::new(
8617 "wide",
8618 vec![
8619 ColumnSchema::new("a", DataType::SmallInt, true),
8620 ColumnSchema::new("b", DataType::Int, false),
8621 ColumnSchema::new("c", DataType::BigInt, false),
8622 ColumnSchema::new("d", DataType::Float, false),
8623 ColumnSchema::new("e", DataType::Bool, false),
8624 ColumnSchema::new("f", DataType::Text, false),
8625 ColumnSchema::new(
8626 "g",
8627 DataType::Vector {
8628 dim: 3,
8629 encoding: VecEncoding::F32,
8630 },
8631 false,
8632 ),
8633 ColumnSchema::new(
8634 "h",
8635 DataType::Numeric {
8636 precision: 18,
8637 scale: 2,
8638 },
8639 false,
8640 ),
8641 ColumnSchema::new("i", DataType::Date, false),
8642 ColumnSchema::new("j", DataType::Timestamp, false),
8643 ],
8644 );
8645 let cases: &[Row] = &[
8646 Row::new(vec![
8647 Value::SmallInt(7),
8648 Value::Int(42),
8649 Value::BigInt(1_000_000),
8650 Value::Float(1.5),
8651 Value::Bool(true),
8652 Value::Text("hello".into()),
8653 Value::Vector(vec![1.0, 2.0, 3.0]),
8654 Value::Numeric {
8655 scaled: 12345,
8656 scale: 2,
8657 },
8658 Value::Date(20_000),
8659 Value::Timestamp(1_700_000_000_000_000),
8660 ]),
8661 Row::new(vec![
8663 Value::Null,
8664 Value::Int(0),
8665 Value::BigInt(0),
8666 Value::Float(0.0),
8667 Value::Bool(false),
8668 Value::Text(String::new()),
8669 Value::Vector(vec![]),
8670 Value::Numeric {
8671 scaled: 0,
8672 scale: 2,
8673 },
8674 Value::Date(0),
8675 Value::Timestamp(0),
8676 ]),
8677 Row::new(vec![
8678 Value::SmallInt(-1),
8679 Value::Int(-1),
8680 Value::BigInt(-1),
8681 Value::Float(-0.5),
8682 Value::Bool(true),
8683 Value::Text("a much longer payload here".into()),
8684 Value::Vector(vec![0.1, 0.2, 0.3]),
8685 Value::Numeric {
8686 scaled: -999_999_999,
8687 scale: 2,
8688 },
8689 Value::Date(-1),
8690 Value::Timestamp(-1),
8691 ]),
8692 ];
8693 for row in cases {
8694 let actual = encode_row_body_dense(row, &schema).len();
8695 let fast = row_body_encoded_len(row, &schema);
8696 assert_eq!(actual, fast, "row {row:?}");
8697 }
8698 }
8699
8700 #[test]
8701 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8702 let mut cat = Catalog::new();
8703 cat.create_table(bigint_pk_users_schema()).unwrap();
8704 let t = cat.get_mut("users").unwrap();
8705 assert_eq!(t.hot_bytes(), 0);
8706 let mut expected: u64 = 0;
8707 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8708 let row = make_user_row(id, name);
8709 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8710 t.insert(row).unwrap();
8711 }
8712 assert_eq!(t.hot_bytes(), expected);
8713 assert_eq!(cat.hot_tier_bytes(), expected);
8714 }
8715
8716 #[test]
8717 fn hot_bytes_shrinks_on_delete() {
8718 let mut cat = Catalog::new();
8719 cat.create_table(bigint_pk_users_schema()).unwrap();
8720 let t = cat.get_mut("users").unwrap();
8721 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8722 t.insert(make_user_row(id, name)).unwrap();
8723 }
8724 let before = t.hot_bytes();
8725 let bob_row = make_user_row(2, "bob");
8727 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8728 let removed = t.delete_rows(&[1]);
8729 assert_eq!(removed, 1);
8730 assert_eq!(t.hot_bytes(), before - bob_bytes);
8731 }
8732
8733 #[test]
8734 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8735 let mut cat = Catalog::new();
8736 cat.create_table(bigint_pk_users_schema()).unwrap();
8737 let t = cat.get_mut("users").unwrap();
8738 t.insert(make_user_row(1, "alice")).unwrap();
8739 let after_insert = t.hot_bytes();
8740 let new_row = make_user_row(1, "alice-the-longer-name");
8743 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8744 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8745 t.update_row(0, new_row.values).unwrap();
8746 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8747 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8748 }
8749
8750 #[test]
8751 fn hot_bytes_round_trips_through_serialize_deserialize() {
8752 let mut cat = Catalog::new();
8753 cat.create_table(bigint_pk_users_schema()).unwrap();
8754 let t = cat.get_mut("users").unwrap();
8755 for i in 0..10 {
8756 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8757 .unwrap();
8758 }
8759 let pre = cat.hot_tier_bytes();
8760 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8761 assert_eq!(restored.hot_tier_bytes(), pre);
8762 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8763 }
8764
8765 #[test]
8772 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8773 let mut cat = Catalog::new();
8774 cat.create_table(bigint_pk_users_schema()).unwrap();
8775 let t = cat.get_mut("users").unwrap();
8776 for id in 0..10i64 {
8777 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8778 .unwrap();
8779 }
8780 t.add_index("by_id".into(), "id").unwrap();
8781 let total_bytes_before = t.hot_bytes();
8782
8783 let report = cat
8784 .freeze_oldest_to_cold("users", "by_id", 6)
8785 .expect("freeze succeeds");
8786 assert_eq!(report.frozen_rows, 6);
8787 assert_eq!(report.segment_id, 0);
8788 assert!(report.bytes_freed > 0);
8789 assert!(!report.segment_bytes.is_empty());
8790
8791 let t = cat.get("users").unwrap();
8792 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8793 assert_eq!(cat.cold_segment_count(), 1);
8794 assert_eq!(
8796 t.hot_bytes(),
8797 total_bytes_before - report.bytes_freed,
8798 "hot_bytes accounting matches FreezeReport"
8799 );
8800
8801 for id in 0..10i64 {
8804 let got = cat
8805 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8806 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8807 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8808 }
8809 }
8810
8811 #[test]
8816 fn freeze_twice_preserves_prior_cold_locators() {
8817 let mut cat = Catalog::new();
8818 cat.create_table(bigint_pk_users_schema()).unwrap();
8819 let t = cat.get_mut("users").unwrap();
8820 for id in 0..12i64 {
8821 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8822 .unwrap();
8823 }
8824 t.add_index("by_id".into(), "id").unwrap();
8825
8826 cat.freeze_oldest_to_cold("users", "by_id", 4)
8827 .expect("first freeze ok");
8828 cat.freeze_oldest_to_cold("users", "by_id", 4)
8829 .expect("second freeze ok");
8830
8831 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8832 assert_eq!(cat.cold_segment_count(), 2);
8833 for id in 0..12i64 {
8836 let got = cat
8837 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8838 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8839 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8840 }
8841 }
8842
8843 #[test]
8846 fn freeze_oldest_to_cold_rejects_invalid_input() {
8847 let mut cat = Catalog::new();
8848 cat.create_table(bigint_pk_users_schema()).unwrap();
8849 let t = cat.get_mut("users").unwrap();
8850 for id in 0..3i64 {
8851 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8852 .unwrap();
8853 }
8854 t.add_index("by_id".into(), "id").unwrap();
8855
8856 assert!(matches!(
8858 cat.freeze_oldest_to_cold("users", "by_id", 0),
8859 Err(StorageError::Corrupt(_))
8860 ));
8861 assert!(matches!(
8863 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8864 Err(StorageError::Corrupt(_))
8865 ));
8866 assert!(matches!(
8868 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8869 Err(StorageError::Corrupt(_))
8870 ));
8871 assert!(matches!(
8873 cat.freeze_oldest_to_cold("users", "by_id", 999),
8874 Err(StorageError::Corrupt(_))
8875 ));
8876 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8878 assert_eq!(cat.cold_segment_count(), 0);
8879 }
8880
8881 #[test]
8884 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8885 let mut cat = Catalog::new();
8886 cat.create_table(TableSchema::new(
8887 "by_name",
8888 vec![
8889 ColumnSchema::new("name", DataType::Text, false),
8890 ColumnSchema::new("payload", DataType::BigInt, false),
8891 ],
8892 ))
8893 .unwrap();
8894 let t = cat.get_mut("by_name").unwrap();
8895 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8896 .unwrap();
8897 t.add_index("by_n".into(), "name").unwrap();
8898 let err = cat
8899 .freeze_oldest_to_cold("by_name", "by_n", 1)
8900 .expect_err("non-integer PK rejected");
8901 match err {
8902 StorageError::Corrupt(s) => assert!(
8903 s.contains("non-integer"),
8904 "error message names the constraint: {s}"
8905 ),
8906 other => panic!("expected Corrupt, got {other:?}"),
8907 }
8908 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8910 assert_eq!(cat.cold_segment_count(), 0);
8911 }
8912
8913 #[test]
8918 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8919 let mut cat = Catalog::new();
8920 cat.create_table(bigint_pk_users_schema()).unwrap();
8921 let t = cat.get_mut("users").unwrap();
8922 for id in 0..6i64 {
8923 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8924 .unwrap();
8925 }
8926 t.add_index("by_id".into(), "id").unwrap();
8927 t.add_index("by_name".into(), "name").unwrap();
8928
8929 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8930
8931 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8935 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8936 assert_eq!(got.len(), 1);
8937 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8938 match got[0] {
8939 RowLocator::Hot(i) => {
8940 assert_eq!(i, 1);
8943 }
8944 RowLocator::Cold { .. } => unreachable!(),
8945 }
8946 }
8947
8948 #[test]
8956 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8957 let mut cat = Catalog::new();
8958 cat.create_table(bigint_pk_users_schema()).unwrap();
8959 let t = cat.get_mut("users").unwrap();
8960 for id in 0..6i64 {
8961 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8962 .unwrap();
8963 }
8964 t.add_index("by_id".into(), "id").unwrap();
8965 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8968 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8969
8970 let new_idx = cat
8972 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8973 .expect("promote ok")
8974 .expect("PK 2 was cold");
8975 assert_eq!(
8976 new_idx, 2,
8977 "promoted row appended after the 2 surviving hot rows"
8978 );
8979
8980 let t = cat.get("users").unwrap();
8981 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8982 let row = make_user_row(2, "u-2");
8984 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8985 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8986
8987 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8990 assert_eq!(entries.len(), 1, "exactly one locator per key");
8991 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8992 assert_eq!(
8994 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8995 .unwrap(),
8996 row
8997 );
8998 assert_eq!(
9001 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
9002 .unwrap(),
9003 make_user_row(0, "u-0")
9004 );
9005 }
9006
9007 #[test]
9011 fn promote_cold_row_returns_none_when_key_is_not_cold() {
9012 let mut cat = Catalog::new();
9013 cat.create_table(bigint_pk_users_schema()).unwrap();
9014 let t = cat.get_mut("users").unwrap();
9015 t.insert(make_user_row(7, "alice")).unwrap();
9016 t.add_index("by_id".into(), "id").unwrap();
9017
9018 assert!(
9020 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
9021 .unwrap()
9022 .is_none()
9023 );
9024 assert!(
9026 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
9027 .unwrap()
9028 .is_none()
9029 );
9030 assert_eq!(cat.get("users").unwrap().row_count(), 1);
9032 assert_eq!(cat.cold_segment_count(), 0);
9033 }
9034
9035 #[test]
9040 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
9041 let mut cat = Catalog::new();
9042 cat.create_table(bigint_pk_users_schema()).unwrap();
9043 let t = cat.get_mut("users").unwrap();
9044 for id in 0..5i64 {
9045 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9046 .unwrap();
9047 }
9048 t.add_index("by_id".into(), "id").unwrap();
9049 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9050
9051 assert!(
9053 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
9054 .is_some(),
9055 "frozen PK resolves before shadow"
9056 );
9057 let removed = cat
9058 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
9059 .unwrap();
9060 assert_eq!(removed, 1, "exactly one cold locator retired");
9061
9062 assert!(
9065 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
9066 .is_none(),
9067 "shadowed key no longer resolves"
9068 );
9069 assert_eq!(
9071 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
9072 .unwrap(),
9073 make_user_row(0, "u-0")
9074 );
9075 assert_eq!(
9076 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
9077 .unwrap(),
9078 make_user_row(2, "u-2")
9079 );
9080 }
9081
9082 #[test]
9087 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
9088 let mut cat = Catalog::new();
9089 cat.create_table(bigint_pk_users_schema()).unwrap();
9090 let t = cat.get_mut("users").unwrap();
9091 t.insert(make_user_row(1, "alice")).unwrap();
9092 t.add_index("by_id".into(), "id").unwrap();
9093 assert_eq!(
9094 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
9095 .unwrap(),
9096 0,
9097 "hot-only key drops no cold locators"
9098 );
9099 assert_eq!(
9100 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
9101 .unwrap(),
9102 0,
9103 "absent key drops no cold locators"
9104 );
9105 assert_eq!(cat.get("users").unwrap().row_count(), 1);
9106 }
9107
9108 #[test]
9110 fn promote_and_shadow_reject_invalid_inputs() {
9111 let mut cat = Catalog::new();
9112 cat.create_table(bigint_pk_users_schema()).unwrap();
9113 let t = cat.get_mut("users").unwrap();
9114 t.insert(make_user_row(1, "alice")).unwrap();
9115 t.add_index("by_id".into(), "id").unwrap();
9116
9117 assert!(matches!(
9119 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
9120 Err(StorageError::Corrupt(_))
9121 ));
9122 assert!(matches!(
9123 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
9124 Err(StorageError::Corrupt(_))
9125 ));
9126 assert!(matches!(
9128 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9129 Err(StorageError::Corrupt(_))
9130 ));
9131 assert!(matches!(
9132 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9133 Err(StorageError::Corrupt(_))
9134 ));
9135 }
9136
9137 #[test]
9144 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
9145 let mut a = Catalog::new();
9146 let mut b = Catalog::new();
9147 for cat in [&mut a, &mut b] {
9148 cat.create_table(bigint_pk_users_schema()).unwrap();
9149 let t = cat.get_mut("users").unwrap();
9150 for id in 0..10i64 {
9151 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9152 .unwrap();
9153 }
9154 t.add_index("by_id".into(), "id").unwrap();
9155 }
9156 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
9157 let slice = b
9158 .prepare_freeze_slice("users", "by_id", 0..6)
9159 .expect("prepare");
9160 let parallel = b
9161 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
9162 .expect("commit");
9163 assert_eq!(single.segment_id, parallel.segment_id);
9164 assert_eq!(single.frozen_rows, parallel.frozen_rows);
9165 assert_eq!(single.bytes_freed, parallel.bytes_freed);
9166 assert_eq!(single.segment_bytes, parallel.segment_bytes);
9167 for id in 0..10i64 {
9169 assert_eq!(
9170 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9171 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9172 "PK {id} differs after single vs slice freeze"
9173 );
9174 }
9175 }
9176
9177 #[test]
9182 fn commit_freeze_slices_two_slices_match_single_slice() {
9183 let mut a = Catalog::new();
9184 let mut b = Catalog::new();
9185 for cat in [&mut a, &mut b] {
9186 cat.create_table(bigint_pk_users_schema()).unwrap();
9187 let t = cat.get_mut("users").unwrap();
9188 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
9191 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
9192 .unwrap();
9193 }
9194 t.add_index("by_id".into(), "id").unwrap();
9195 }
9196 let single = a
9197 .prepare_freeze_slice("users", "by_id", 0..8)
9198 .expect("prepare");
9199 let one = a
9200 .commit_freeze_slices("users", "by_id", alloc::vec![single])
9201 .expect("commit one");
9202 let s1 = b
9203 .prepare_freeze_slice("users", "by_id", 0..4)
9204 .expect("prepare s1");
9205 let s2 = b
9206 .prepare_freeze_slice("users", "by_id", 4..8)
9207 .expect("prepare s2");
9208 let two = b
9209 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
9210 .expect("commit two");
9211 assert_eq!(one.segment_bytes, two.segment_bytes);
9212 assert_eq!(one.frozen_rows, two.frozen_rows);
9213 for id in 0..10i64 {
9216 assert_eq!(
9217 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9218 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9219 "PK {id} differs after one-slice vs two-slice freeze"
9220 );
9221 }
9222 }
9223
9224 #[test]
9226 fn commit_freeze_slices_rejects_gap() {
9227 let mut cat = Catalog::new();
9228 cat.create_table(bigint_pk_users_schema()).unwrap();
9229 let t = cat.get_mut("users").unwrap();
9230 for id in 0..6i64 {
9231 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9232 .unwrap();
9233 }
9234 t.add_index("by_id".into(), "id").unwrap();
9235 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
9236 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
9237 assert!(matches!(
9238 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
9239 Err(StorageError::Corrupt(_))
9240 ));
9241 assert_eq!(cat.cold_segment_count(), 0);
9243 assert_eq!(cat.get("users").unwrap().row_count(), 6);
9244 }
9245
9246 #[test]
9248 fn commit_freeze_slices_empty_is_noop() {
9249 let mut cat = Catalog::new();
9250 cat.create_table(bigint_pk_users_schema()).unwrap();
9251 let t = cat.get_mut("users").unwrap();
9252 for id in 0..3i64 {
9253 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9254 .unwrap();
9255 }
9256 t.add_index("by_id".into(), "id").unwrap();
9257 let report = cat
9258 .commit_freeze_slices("users", "by_id", Vec::new())
9259 .unwrap();
9260 assert_eq!(report.frozen_rows, 0);
9261 assert_eq!(cat.cold_segment_count(), 0);
9262 assert_eq!(cat.get("users").unwrap().row_count(), 3);
9263 }
9264
9265 #[test]
9272 fn compact_merges_small_segments_storage_unit() {
9273 let mut cat = Catalog::new();
9274 cat.create_table(bigint_pk_users_schema()).unwrap();
9275 let t = cat.get_mut("users").unwrap();
9276 for id in 0..8i64 {
9277 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9278 .unwrap();
9279 }
9280 t.add_index("by_id".into(), "id").unwrap();
9281 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9283 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9284 assert_eq!(cat.cold_segment_count(), 2);
9285 assert_eq!(cat.cold_segment_slot_count(), 2);
9286
9287 let max_seg_bytes = cat
9290 .cold_segment_ids_global()
9291 .iter()
9292 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9293 .max()
9294 .unwrap();
9295 let target = max_seg_bytes + 1;
9296
9297 let report = cat
9298 .compact_cold_segments("users", "by_id", target)
9299 .expect("compact succeeds");
9300 assert_eq!(report.sources.len(), 2);
9301 let merged_id = report.merged_segment_id.expect("merge happened");
9302 assert_eq!(report.merged_rows, 6);
9303 assert_eq!(report.deleted_rows_pruned, 0);
9304 assert!(!report.merged_segment_bytes.is_empty());
9305
9306 assert_eq!(cat.cold_segment_count(), 1);
9309 assert_eq!(cat.cold_segment_slot_count(), 3);
9310 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
9311
9312 for id in 0..8i64 {
9315 let got = cat
9316 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9317 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
9318 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9319 }
9320 }
9321
9322 #[test]
9326 fn compact_drops_shadowed_cold_rows() {
9327 let mut cat = Catalog::new();
9328 cat.create_table(bigint_pk_users_schema()).unwrap();
9329 let t = cat.get_mut("users").unwrap();
9330 for id in 0..6i64 {
9331 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9332 .unwrap();
9333 }
9334 t.add_index("by_id".into(), "id").unwrap();
9335 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9336 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9337 assert_eq!(
9339 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
9340 .unwrap(),
9341 1
9342 );
9343 assert_eq!(
9344 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
9345 .unwrap(),
9346 1
9347 );
9348
9349 let max_seg_bytes = cat
9350 .cold_segment_ids_global()
9351 .iter()
9352 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9353 .max()
9354 .unwrap();
9355 let report = cat
9356 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9357 .expect("compact succeeds");
9358 assert_eq!(report.sources.len(), 2);
9359 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
9360 assert_eq!(report.deleted_rows_pruned, 2);
9361
9362 for shadowed in [1i64, 4i64] {
9364 assert!(
9365 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
9366 .is_none(),
9367 "shadowed PK {shadowed} must remain invisible after compact"
9368 );
9369 }
9370 for live in [0i64, 2, 3, 5] {
9372 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
9373 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
9374 }
9375 }
9376
9377 #[test]
9380 fn compact_is_noop_below_two_candidates() {
9381 let mut cat = Catalog::new();
9382 cat.create_table(bigint_pk_users_schema()).unwrap();
9383 let t = cat.get_mut("users").unwrap();
9384 for id in 0..6i64 {
9385 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9386 .unwrap();
9387 }
9388 t.add_index("by_id".into(), "id").unwrap();
9389 let report = cat
9391 .compact_cold_segments("users", "by_id", 1 << 30)
9392 .expect("noop ok");
9393 assert!(report.merged_segment_id.is_none());
9394 assert!(report.sources.is_empty());
9395
9396 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
9398 let report = cat
9399 .compact_cold_segments("users", "by_id", 1 << 30)
9400 .expect("noop ok");
9401 assert!(report.merged_segment_id.is_none());
9402 assert_eq!(cat.cold_segment_count(), 1);
9403
9404 let report = cat
9407 .compact_cold_segments("users", "by_id", 1)
9408 .expect("noop ok");
9409 assert!(report.merged_segment_id.is_none());
9410 assert_eq!(cat.cold_segment_count(), 1);
9411 }
9412
9413 #[test]
9421 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
9422 let mut cat = Catalog::new();
9423 cat.create_table(bigint_pk_users_schema()).unwrap();
9424 let t = cat.get_mut("users").unwrap();
9425 for id in 0..6i64 {
9426 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9427 .unwrap();
9428 }
9429 t.add_index("by_id".into(), "id").unwrap();
9430 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9431 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9432 let max_seg_bytes = cat
9433 .cold_segment_ids_global()
9434 .iter()
9435 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9436 .max()
9437 .unwrap();
9438 let report = cat
9439 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9440 .expect("compact ok");
9441 let merged_id = report.merged_segment_id.unwrap();
9442
9443 let cat_bytes = cat.serialize();
9448 let merged_bytes = report.merged_segment_bytes.clone();
9449
9450 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
9451 restored
9452 .load_segment_bytes_at(merged_id, merged_bytes)
9453 .expect("reload merged ok");
9454
9455 for id in 0..6i64 {
9457 let got = restored
9458 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9459 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
9460 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9461 }
9462 assert_eq!(restored.cold_segment_count(), 1);
9465 }
9466
9467 #[test]
9470 fn load_segment_bytes_at_pads_and_rejects_collision() {
9471 let mut cat = Catalog::new();
9472 cat.create_table(bigint_pk_users_schema()).unwrap();
9473 let t = cat.get_mut("users").unwrap();
9474 for id in 0..4i64 {
9475 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9476 .unwrap();
9477 }
9478 t.add_index("by_id".into(), "id").unwrap();
9479 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9480 let bytes_seg0 = report.segment_bytes.clone();
9481
9482 cat.load_segment_bytes_at(5, bytes_seg0.clone())
9486 .expect("pad + load ok");
9487 assert_eq!(cat.cold_segment_slot_count(), 6);
9488 assert_eq!(cat.cold_segment_count(), 2);
9489
9490 assert!(matches!(
9492 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9493 Err(StorageError::Corrupt(_))
9494 ));
9495 assert!(matches!(
9497 cat.load_segment_bytes_at(0, bytes_seg0),
9498 Err(StorageError::Corrupt(_))
9499 ));
9500 }
9501
9502 #[test]
9506 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9507 let mut cat = Catalog::new();
9508 cat.create_table(bigint_pk_users_schema()).unwrap();
9509 let t = cat.get_mut("users").unwrap();
9510 for id in 0..4i64 {
9511 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9512 .unwrap();
9513 }
9514 t.add_index("by_id".into(), "id").unwrap();
9515
9516 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9518 let promoted = cat
9519 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9520 .unwrap();
9521 assert!(promoted.is_some());
9522 let entries_after_promote = cat
9523 .get("users")
9524 .unwrap()
9525 .index_on(0)
9526 .unwrap()
9527 .lookup_eq(&IndexKey::Int(0))
9528 .to_vec();
9529 assert_eq!(entries_after_promote.len(), 1);
9530 assert!(entries_after_promote[0].is_hot());
9531
9532 for id in [2i64, 3] {
9539 assert_eq!(
9540 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9541 .unwrap(),
9542 make_user_row(id, &alloc::format!("u-{id}"))
9543 );
9544 }
9545 }
9546}