1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21pub mod trgm;
22
23pub use self::bloom::{BloomError, BloomFilter};
24pub use self::row_locator::{RowLocator, RowLocatorError};
25pub use self::segment::{
26 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
27 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
28 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
29 wrap_v2_envelope_with_brin,
30};
31
32use alloc::boxed::Box;
33use alloc::collections::{BTreeMap, BTreeSet};
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::sync::Arc;
37use alloc::vec::Vec;
38use core::fmt;
39
40use self::persistent::PersistentVec;
41use self::persistent_btree::PersistentBTreeMap;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
55pub enum VecEncoding {
56 #[default]
57 F32,
58 Sq8,
59 F16,
60}
61
62impl fmt::Display for VecEncoding {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 match self {
65 Self::F32 => f.write_str("F32"),
66 Self::Sq8 => f.write_str("SQ8"),
67 Self::F16 => f.write_str("HALF"),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum DataType {
77 SmallInt,
80 Int, BigInt, Float, Text,
84 Varchar(u32),
87 Char(u32),
91 Bool,
92 Vector {
98 dim: u32,
99 encoding: VecEncoding,
100 },
101 Numeric {
107 precision: u8,
108 scale: u8,
109 },
110 Date,
113 Timestamp,
116 Timestamptz,
124 Interval,
129 Json,
134 Jsonb,
140 Bytes,
148 TextArray,
157 IntArray,
161 BigIntArray,
164 TsVector,
172 TsQuery,
176}
177
178impl fmt::Display for DataType {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 match self {
181 Self::SmallInt => f.write_str("SMALLINT"),
182 Self::Int => f.write_str("INT"),
183 Self::BigInt => f.write_str("BIGINT"),
184 Self::Float => f.write_str("FLOAT"),
185 Self::Text => f.write_str("TEXT"),
186 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
187 Self::Char(n) => write!(f, "CHAR({n})"),
188 Self::Bool => f.write_str("BOOL"),
189 Self::Vector { dim, encoding } => match encoding {
190 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
191 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
192 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
193 },
194 Self::Numeric { precision, scale } => {
195 if *scale == 0 {
196 write!(f, "NUMERIC({precision})")
197 } else {
198 write!(f, "NUMERIC({precision}, {scale})")
199 }
200 }
201 Self::Date => f.write_str("DATE"),
202 Self::Timestamp => f.write_str("TIMESTAMP"),
203 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
204 Self::Interval => f.write_str("INTERVAL"),
205 Self::Json => f.write_str("JSON"),
206 Self::Jsonb => f.write_str("JSONB"),
207 Self::Bytes => f.write_str("BYTEA"),
208 Self::TextArray => f.write_str("TEXT[]"),
209 Self::IntArray => f.write_str("INT[]"),
210 Self::BigIntArray => f.write_str("BIGINT[]"),
211 Self::TsVector => f.write_str("TSVECTOR"),
212 Self::TsQuery => f.write_str("TSQUERY"),
213 }
214 }
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct TsLexeme {
224 pub word: String,
225 pub positions: Vec<u16>,
226 pub weight: u8,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum TsQueryAst {
234 Term {
238 word: String,
239 weight_mask: u8,
240 },
241 And(Box<TsQueryAst>, Box<TsQueryAst>),
242 Or(Box<TsQueryAst>, Box<TsQueryAst>),
243 Not(Box<TsQueryAst>),
244 Phrase {
247 left: Box<TsQueryAst>,
248 right: Box<TsQueryAst>,
249 distance: u16,
250 },
251}
252
253#[derive(Debug, Clone, PartialEq)]
257#[non_exhaustive]
258pub enum Value {
259 SmallInt(i16),
260 Int(i32),
261 BigInt(i64),
262 Float(f64),
263 Text(String),
264 Bool(bool),
265 Vector(Vec<f32>),
266 Sq8Vector(crate::quantize::Sq8Vector),
273 HalfVector(crate::halfvec::HalfVector),
279 Numeric {
283 scaled: i128,
284 scale: u8,
285 },
286 Date(i32),
288 Timestamp(i64),
290 Interval {
293 months: i32,
294 micros: i64,
295 },
296 Json(String),
300 Bytes(Vec<u8>),
306 TextArray(Vec<Option<String>>),
312 IntArray(Vec<Option<i32>>),
316 BigIntArray(Vec<Option<i64>>),
319 TsVector(Vec<TsLexeme>),
324 TsQuery(TsQueryAst),
327 Null,
328}
329
330impl Value {
331 pub fn data_type(&self) -> Option<DataType> {
333 match self {
334 Self::SmallInt(_) => Some(DataType::SmallInt),
335 Self::Int(_) => Some(DataType::Int),
336 Self::BigInt(_) => Some(DataType::BigInt),
337 Self::Float(_) => Some(DataType::Float),
338 Self::Text(_) => Some(DataType::Text),
341 Self::Bool(_) => Some(DataType::Bool),
342 Self::Vector(v) => Some(DataType::Vector {
343 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
344 encoding: VecEncoding::F32,
345 }),
346 Self::Sq8Vector(q) => Some(DataType::Vector {
347 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
348 encoding: VecEncoding::Sq8,
349 }),
350 Self::HalfVector(h) => Some(DataType::Vector {
351 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
352 encoding: VecEncoding::F16,
353 }),
354 Self::Numeric { scale, .. } => Some(DataType::Numeric {
359 precision: 0,
360 scale: *scale,
361 }),
362 Self::Date(_) => Some(DataType::Date),
363 Self::Timestamp(_) => Some(DataType::Timestamp),
364 Self::Interval { .. } => Some(DataType::Interval),
365 Self::Json(_) => Some(DataType::Json),
366 Self::Bytes(_) => Some(DataType::Bytes),
367 Self::TextArray(_) => Some(DataType::TextArray),
368 Self::IntArray(_) => Some(DataType::IntArray),
369 Self::BigIntArray(_) => Some(DataType::BigIntArray),
370 Self::TsVector(_) => Some(DataType::TsVector),
371 Self::TsQuery(_) => Some(DataType::TsQuery),
372 Self::Null => None,
373 }
374 }
375
376 pub const fn is_null(&self) -> bool {
377 matches!(self, Self::Null)
378 }
379}
380
381#[derive(Debug, Clone, PartialEq)]
384pub struct Row {
385 pub values: Vec<Value>,
386}
387
388impl Row {
389 pub const fn new(values: Vec<Value>) -> Self {
390 Self { values }
391 }
392
393 pub fn len(&self) -> usize {
394 self.values.len()
395 }
396
397 pub fn is_empty(&self) -> bool {
398 self.values.is_empty()
399 }
400}
401
402#[derive(Debug, Clone, PartialEq)]
403pub struct ColumnSchema {
404 pub name: String,
405 pub ty: DataType,
406 pub nullable: bool,
407 pub default: Option<Value>,
412 pub runtime_default: Option<String>,
420 pub auto_increment: bool,
424}
425
426#[derive(Debug, Clone, PartialEq)]
427pub struct TableSchema {
428 pub name: String,
429 pub columns: Vec<ColumnSchema>,
430 pub hot_tier_bytes: Option<u64>,
436 pub foreign_keys: Vec<ForeignKeyConstraint>,
443 pub uniqueness_constraints: Vec<UniquenessConstraint>,
450 pub checks: Vec<String>,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq)]
466pub struct UniquenessConstraint {
467 pub is_primary_key: bool,
472 pub columns: Vec<usize>,
476 pub nulls_not_distinct: bool,
483}
484
485#[derive(Debug, Clone, PartialEq, Eq)]
490pub struct ForeignKeyConstraint {
491 pub name: Option<String>,
495 pub local_columns: Vec<usize>,
498 pub parent_table: String,
500 pub parent_columns: Vec<usize>,
505 pub on_delete: FkAction,
507 pub on_update: FkAction,
510}
511
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
514pub enum FkAction {
515 Restrict,
516 Cascade,
517 SetNull,
518 SetDefault,
519 NoAction,
520}
521
522impl FkAction {
523 pub const fn tag(self) -> u8 {
525 match self {
526 Self::Restrict => 0,
527 Self::Cascade => 1,
528 Self::SetNull => 2,
529 Self::SetDefault => 3,
530 Self::NoAction => 4,
531 }
532 }
533 pub const fn from_tag(b: u8) -> Option<Self> {
534 Some(match b {
535 0 => Self::Restrict,
536 1 => Self::Cascade,
537 2 => Self::SetNull,
538 3 => Self::SetDefault,
539 4 => Self::NoAction,
540 _ => return None,
541 })
542 }
543}
544
545impl TableSchema {
546 pub fn column_position(&self, name: &str) -> Option<usize> {
547 self.columns.iter().position(|c| c.name == name)
548 }
549}
550
551#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
556pub enum IndexKey {
557 Int(i64),
558 Text(String),
559 Bool(bool),
560}
561
562impl IndexKey {
563 pub fn from_value(v: &Value) -> Option<Self> {
564 match v {
565 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
566 Value::Int(n) => Some(Self::Int(i64::from(*n))),
567 Value::BigInt(n) => Some(Self::Int(*n)),
568 Value::Text(s) => Some(Self::Text(s.clone())),
569 Value::Bool(b) => Some(Self::Bool(*b)),
570 Value::Date(d) => Some(Self::Int(i64::from(*d))),
573 Value::Timestamp(t) => Some(Self::Int(*t)),
574 Value::Null
579 | Value::Float(_)
580 | Value::Vector(_)
581 | Value::Sq8Vector(_)
582 | Value::HalfVector(_)
583 | Value::Numeric { .. }
584 | Value::Interval { .. }
585 | Value::Json(_)
586 | Value::Bytes(_)
587 | Value::TextArray(_)
588 | Value::IntArray(_)
589 | Value::BigIntArray(_)
590 | Value::TsVector(_)
591 | Value::TsQuery(_) => None,
592 }
593 }
594}
595
596#[derive(Debug, Clone)]
601pub struct Index {
602 pub name: String,
603 pub column_position: usize,
604 pub kind: IndexKind,
605 pub included_columns: Vec<usize>,
615 pub partial_predicate: Option<String>,
622 pub expression: Option<String>,
627 pub is_unique: bool,
634 pub extra_column_positions: Vec<usize>,
643}
644
645pub const NSW_DEFAULT_M: usize = 16;
648
649#[derive(Debug, Clone)]
657pub struct FreezeReport {
658 pub segment_id: u32,
661 pub frozen_rows: usize,
664 pub bytes_freed: u64,
668 pub segment_bytes: Vec<u8>,
673}
674
675#[derive(Debug, Clone)]
684pub struct FreezeSlice {
685 pub row_range: core::ops::Range<usize>,
690 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
696}
697
698#[derive(Debug, Clone)]
714pub struct CompactReport {
715 pub sources: Vec<u32>,
717 pub merged_segment_id: Option<u32>,
719 pub merged_segment_bytes: Vec<u8>,
721 pub merged_rows: usize,
723 pub deleted_rows_pruned: usize,
728 pub bytes_reclaimed_estimate: u64,
732}
733
734#[derive(Debug, Clone)]
735pub enum IndexKind {
736 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
753 Nsw(NswGraph),
755 Brin {
762 column_type: DataType,
766 },
767 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
783 GinTrgm(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
794}
795
796#[derive(Debug, Clone)]
805pub struct NswGraph {
806 pub m: usize,
808 pub m_max_0: usize,
811 pub entry: Option<usize>,
814 pub entry_level: u8,
816 pub levels: PersistentVec<u8>,
823 pub layers: Vec<PersistentVec<Vec<u32>>>,
839}
840
841impl NswGraph {
842 fn new(m: usize) -> Self {
843 Self {
844 m,
845 m_max_0: m.saturating_mul(2),
846 entry: None,
847 entry_level: 0,
848 levels: PersistentVec::new(),
849 layers: alloc::vec![PersistentVec::new()],
850 }
851 }
852
853 pub const fn cap_for_layer(&self, layer: u8) -> usize {
855 if layer == 0 { self.m_max_0 } else { self.m }
856 }
857}
858
859#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
866 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
869 x ^= x >> 30;
870 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
871 x ^= x >> 27;
872 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
873 x ^= x >> 31;
874 let mut level: u8 = 0;
879 while x & 0xF == 0 && level < MAX_LEVEL {
880 level += 1;
881 x >>= 4;
882 }
883 level
884}
885
886impl Index {
887 fn new_btree(name: String, column_position: usize) -> Self {
888 Self {
889 name,
890 column_position,
891 kind: IndexKind::BTree(PersistentBTreeMap::new()),
892 included_columns: Vec::new(),
893 partial_predicate: None,
894 expression: None,
895 is_unique: false,
896 extra_column_positions: Vec::new(),
897 }
898 }
899
900 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
901 Self {
902 name,
903 column_position,
904 kind: IndexKind::Nsw(NswGraph::new(m)),
905 included_columns: Vec::new(),
906 partial_predicate: None,
907 expression: None,
908 is_unique: false,
909 extra_column_positions: Vec::new(),
910 }
911 }
912
913 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
917 Self {
918 name,
919 column_position,
920 kind: IndexKind::Brin { column_type },
921 included_columns: Vec::new(),
922 partial_predicate: None,
923 expression: None,
924 is_unique: false,
925 extra_column_positions: Vec::new(),
926 }
927 }
928
929 fn new_gin(name: String, column_position: usize) -> Self {
934 Self {
935 name,
936 column_position,
937 kind: IndexKind::Gin(PersistentBTreeMap::new()),
938 included_columns: Vec::new(),
939 partial_predicate: None,
940 expression: None,
941 is_unique: false,
942 extra_column_positions: Vec::new(),
943 }
944 }
945
946 fn new_gin_trgm(name: String, column_position: usize) -> Self {
951 Self {
952 name,
953 column_position,
954 kind: IndexKind::GinTrgm(PersistentBTreeMap::new()),
955 included_columns: Vec::new(),
956 partial_predicate: None,
957 expression: None,
958 is_unique: false,
959 extra_column_positions: Vec::new(),
960 }
961 }
962
963 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
972 match &self.kind {
973 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
974 IndexKind::Nsw(_)
978 | IndexKind::Brin { .. }
979 | IndexKind::Gin(_)
980 | IndexKind::GinTrgm(_) => &[][..],
981 }
982 }
983
984 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
988 match &self.kind {
989 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
990 IndexKind::BTree(_)
991 | IndexKind::Nsw(_)
992 | IndexKind::Brin { .. }
993 | IndexKind::GinTrgm(_) => &[][..],
994 }
995 }
996
997 pub fn gin_trgm_lookup(&self, tri: &str) -> &[RowLocator] {
1002 match &self.kind {
1003 IndexKind::GinTrgm(m) => m.get(&String::from(tri)).map_or(&[][..], Vec::as_slice),
1004 IndexKind::BTree(_)
1005 | IndexKind::Nsw(_)
1006 | IndexKind::Brin { .. }
1007 | IndexKind::Gin(_) => &[][..],
1008 }
1009 }
1010
1011 pub const fn nsw(&self) -> Option<&NswGraph> {
1014 match &self.kind {
1015 IndexKind::Nsw(g) => Some(g),
1016 IndexKind::BTree(_)
1017 | IndexKind::Brin { .. }
1018 | IndexKind::Gin(_)
1019 | IndexKind::GinTrgm(_) => None,
1020 }
1021 }
1022
1023 pub const fn is_brin(&self) -> bool {
1028 matches!(self.kind, IndexKind::Brin { .. })
1029 }
1030
1031 pub const fn is_gin_trgm(&self) -> bool {
1035 matches!(self.kind, IndexKind::GinTrgm(_))
1036 }
1037
1038 pub const fn is_gin(&self) -> bool {
1042 matches!(self.kind, IndexKind::Gin(_))
1043 }
1044}
1045
1046#[derive(Debug, Clone)]
1062pub struct Table {
1063 schema: TableSchema,
1064 rows: PersistentVec<Row>,
1065 indices: Vec<Index>,
1066 hot_bytes: u64,
1067 cold_row_count: u64,
1081 cold_row_count_stale: bool,
1086}
1087
1088impl Table {
1089 pub fn new(schema: TableSchema) -> Self {
1090 Self {
1091 schema,
1092 rows: PersistentVec::new(),
1093 indices: Vec::new(),
1094 hot_bytes: 0,
1095 cold_row_count: 0,
1096 cold_row_count_stale: false,
1097 }
1098 }
1099
1100 #[must_use]
1104 pub const fn hot_bytes(&self) -> u64 {
1105 self.hot_bytes
1106 }
1107
1108 #[must_use]
1111 pub const fn cold_row_count(&self) -> u64 {
1112 self.cold_row_count
1113 }
1114
1115 pub fn set_cold_row_count(&mut self, n: u64) {
1118 self.cold_row_count = n;
1119 self.cold_row_count_stale = false;
1120 }
1121
1122 pub fn mark_cold_row_count_stale(&mut self) {
1127 self.cold_row_count_stale = true;
1128 }
1129
1130 #[must_use]
1134 pub const fn cold_row_count_stale(&self) -> bool {
1135 self.cold_row_count_stale
1136 }
1137
1138 #[must_use]
1149 pub fn count_cold_locators(&self) -> u64 {
1150 let mut best: u64 = 0;
1151 for idx in &self.indices {
1152 if let IndexKind::BTree(map) = &idx.kind {
1153 let n: u64 = map
1154 .iter()
1155 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1156 .sum();
1157 if n > best {
1158 best = n;
1159 }
1160 }
1161 }
1162 best
1163 }
1164
1165 pub const fn schema(&self) -> &TableSchema {
1166 &self.schema
1167 }
1168
1169 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1173 &mut self.schema
1174 }
1175
1176 pub const fn rows(&self) -> &PersistentVec<Row> {
1180 &self.rows
1181 }
1182
1183 pub const fn row_count(&self) -> usize {
1184 self.rows.len()
1185 }
1186
1187 pub fn indices_mut(&mut self) -> &mut [Index] {
1192 &mut self.indices
1193 }
1194
1195 pub fn indices(&self) -> &[Index] {
1196 &self.indices
1197 }
1198
1199 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1205 let ty = self.schema.columns.get(col_pos)?.ty;
1206 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1207 return None;
1208 }
1209 let mut max: Option<i64> = None;
1210 for row in &self.rows {
1211 match row.values.get(col_pos) {
1212 Some(Value::SmallInt(n)) => {
1213 let v = i64::from(*n);
1214 max = Some(max.map_or(v, |m| m.max(v)));
1215 }
1216 Some(Value::Int(n)) => {
1217 let v = i64::from(*n);
1218 max = Some(max.map_or(v, |m| m.max(v)));
1219 }
1220 Some(Value::BigInt(n)) => {
1221 max = Some(max.map_or(*n, |m| m.max(*n)));
1222 }
1223 _ => {}
1224 }
1225 }
1226 Some(max.map_or(1, |m| m + 1))
1227 }
1228
1229 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1233 self.indices
1240 .iter()
1241 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1242 .or_else(|| {
1243 self.indices.iter().find(|i| {
1244 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1245 })
1246 })
1247 }
1248
1249 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1253 if row.len() != self.schema.columns.len() {
1254 return Err(StorageError::ArityMismatch {
1255 expected: self.schema.columns.len(),
1256 actual: row.len(),
1257 });
1258 }
1259 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1260 if val.is_null() {
1261 if !col.nullable {
1262 return Err(StorageError::NullInNotNull {
1263 column: col.name.clone(),
1264 });
1265 }
1266 continue;
1267 }
1268 let actual = val.data_type().expect("non-null");
1269 let compatible = actual == col.ty
1283 || matches!(
1284 (actual, col.ty),
1285 (
1286 DataType::Text,
1287 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1288 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1289 | (DataType::Json, DataType::Jsonb)
1290 | (DataType::Jsonb, DataType::Json)
1291 | (DataType::Timestamp, DataType::Timestamptz)
1292 | (DataType::Timestamptz, DataType::Timestamp)
1293 )
1294 || matches!(
1295 (actual, col.ty),
1296 (
1297 DataType::Numeric { scale: a, .. },
1298 DataType::Numeric { scale: b, .. },
1299 ) if a == b
1300 );
1301 if !compatible {
1302 return Err(StorageError::TypeMismatch {
1303 column: col.name.clone(),
1304 expected: col.ty,
1305 actual,
1306 position: i,
1307 });
1308 }
1309 }
1310 let new_row_idx = self.rows.len();
1311 for idx in &mut self.indices {
1315 match &mut idx.kind {
1316 IndexKind::BTree(map) => {
1317 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1318 let mut entries = map.get(&key).cloned().unwrap_or_default();
1324 entries.push(RowLocator::Hot(new_row_idx));
1325 map.insert_mut(key, entries);
1326 }
1327 }
1328 IndexKind::Gin(map) => {
1329 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1333 for lex in lexemes {
1334 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1335 entries.push(RowLocator::Hot(new_row_idx));
1336 map.insert_mut(lex.word.clone(), entries);
1337 }
1338 }
1339 }
1340 IndexKind::GinTrgm(map) => {
1341 if let Value::Text(s) = &row.values[idx.column_position] {
1345 for tri in trgm::extract_trigrams(s) {
1346 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1347 entries.push(RowLocator::Hot(new_row_idx));
1348 map.insert_mut(tri, entries);
1349 }
1350 }
1351 }
1352 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1356 }
1357 }
1358 self.hot_bytes = self
1361 .hot_bytes
1362 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1363 self.rows.push_mut(row);
1368 let new_row_idx = self.rows.len() - 1;
1371 let nsw_targets: Vec<usize> = self
1372 .indices
1373 .iter()
1374 .enumerate()
1375 .filter_map(|(i, idx)| {
1376 if matches!(idx.kind, IndexKind::Nsw(_)) {
1377 Some(i)
1378 } else {
1379 None
1380 }
1381 })
1382 .collect();
1383 for idx_pos in nsw_targets {
1384 nsw_insert_at(self, idx_pos, new_row_idx);
1385 }
1386 Ok(())
1387 }
1388
1389 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1393 if self.indices.iter().any(|i| i.name == name) {
1394 return Err(StorageError::DuplicateIndex { name });
1395 }
1396 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1397 StorageError::ColumnNotFound {
1398 column: column_name.into(),
1399 }
1400 })?;
1401 let mut idx = Index::new_btree(name, column_position);
1402 if let IndexKind::BTree(map) = &mut idx.kind {
1403 for (i, row) in self.rows.iter().enumerate() {
1404 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1405 let mut entries = map.get(&key).cloned().unwrap_or_default();
1406 entries.push(RowLocator::Hot(i));
1407 map.insert_mut(key, entries);
1408 }
1409 }
1410 }
1411 self.indices.push(idx);
1412 Ok(())
1413 }
1414
1415 pub fn add_nsw_index(
1420 &mut self,
1421 name: String,
1422 column_name: &str,
1423 m: usize,
1424 ) -> Result<(), StorageError> {
1425 self.add_nsw_index_inner(name, column_name, m, None)
1426 }
1427
1428 pub fn rebuild_nsw_index(
1440 &mut self,
1441 name: &str,
1442 new_encoding: Option<VecEncoding>,
1443 ) -> Result<(), StorageError> {
1444 let idx_pos = self
1445 .indices
1446 .iter()
1447 .position(|i| i.name == name)
1448 .ok_or_else(|| StorageError::IndexNotFound {
1449 name: String::from(name),
1450 })?;
1451 let col_pos = self.indices[idx_pos].column_position;
1452 let m = match &self.indices[idx_pos].kind {
1453 IndexKind::Nsw(g) => g.m,
1454 IndexKind::BTree(_)
1455 | IndexKind::Brin { .. }
1456 | IndexKind::Gin(_)
1457 | IndexKind::GinTrgm(_) => {
1458 return Err(StorageError::Unsupported(format!(
1459 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1460 )));
1461 }
1462 };
1463 let col_name = self.schema.columns[col_pos].name.clone();
1464 if let Some(target) = new_encoding {
1467 let current = match self.schema.columns[col_pos].ty {
1468 DataType::Vector { encoding, .. } => encoding,
1469 ref other => {
1470 return Err(StorageError::Unsupported(format!(
1471 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1472 )));
1473 }
1474 };
1475 if target != current {
1476 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1477 unreachable!("checked above")
1478 };
1479 let n = self.rows.len();
1480 for i in 0..n {
1481 let row = self
1482 .rows
1483 .get_mut(i)
1484 .expect("row index in bounds (we iterated up to len())");
1485 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1486 let recoded = recode_vector_cell(cell, target)?;
1487 row.values[col_pos] = recoded;
1488 }
1489 self.schema.columns[col_pos].ty = DataType::Vector {
1490 dim,
1491 encoding: target,
1492 };
1493 }
1494 }
1495 self.indices.remove(idx_pos);
1497 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1498 Ok(())
1499 }
1500
1501 pub fn restore_nsw_index(
1506 &mut self,
1507 name: String,
1508 column_name: &str,
1509 graph: NswGraph,
1510 ) -> Result<(), StorageError> {
1511 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1512 }
1513
1514 pub fn restore_btree_index(
1521 &mut self,
1522 name: String,
1523 column_name: &str,
1524 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1525 ) -> Result<(), StorageError> {
1526 if self.indices.iter().any(|i| i.name == name) {
1527 return Err(StorageError::DuplicateIndex { name });
1528 }
1529 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1530 StorageError::ColumnNotFound {
1531 column: column_name.into(),
1532 }
1533 })?;
1534 self.indices.push(Index {
1535 name,
1536 column_position,
1537 kind: IndexKind::BTree(map),
1538 included_columns: Vec::new(),
1539 partial_predicate: None,
1540 expression: None,
1541 is_unique: false,
1542 extra_column_positions: Vec::new(),
1543 });
1544 Ok(())
1545 }
1546
1547 pub fn restore_brin_index(
1552 &mut self,
1553 name: String,
1554 column_name: &str,
1555 column_type: DataType,
1556 ) -> Result<(), StorageError> {
1557 if self.indices.iter().any(|i| i.name == name) {
1558 return Err(StorageError::DuplicateIndex { name });
1559 }
1560 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1561 StorageError::ColumnNotFound {
1562 column: column_name.into(),
1563 }
1564 })?;
1565 self.indices
1566 .push(Index::new_brin(name, column_position, column_type));
1567 Ok(())
1568 }
1569
1570 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1574 if self.indices.iter().any(|i| i.name == name) {
1575 return Err(StorageError::DuplicateIndex { name });
1576 }
1577 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1578 StorageError::ColumnNotFound {
1579 column: column_name.into(),
1580 }
1581 })?;
1582 let column_type = self.schema.columns[column_position].ty;
1583 self.indices
1584 .push(Index::new_brin(name, column_position, column_type));
1585 Ok(())
1586 }
1587
1588 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1593 if self.indices.iter().any(|i| i.name == name) {
1594 return Err(StorageError::DuplicateIndex { name });
1595 }
1596 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1597 StorageError::ColumnNotFound {
1598 column: column_name.into(),
1599 }
1600 })?;
1601 if self.schema.columns[column_position].ty != DataType::TsVector {
1602 return Err(StorageError::Corrupt(format!(
1603 "GIN index {name:?} requires a tsvector column; \
1604 {column_name:?} is {:?}",
1605 self.schema.columns[column_position].ty
1606 )));
1607 }
1608 let mut idx = Index::new_gin(name, column_position);
1609 if let IndexKind::Gin(map) = &mut idx.kind {
1610 for (i, row) in self.rows.iter().enumerate() {
1611 if let Value::TsVector(lexemes) = &row.values[column_position] {
1612 for lex in lexemes {
1613 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1614 entries.push(RowLocator::Hot(i));
1615 map.insert_mut(lex.word.clone(), entries);
1616 }
1617 }
1618 }
1619 }
1620 self.indices.push(idx);
1621 Ok(())
1622 }
1623
1624 pub fn restore_gin_index(
1629 &mut self,
1630 name: String,
1631 column_name: &str,
1632 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1633 ) -> Result<(), StorageError> {
1634 if self.indices.iter().any(|i| i.name == name) {
1635 return Err(StorageError::DuplicateIndex { name });
1636 }
1637 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1638 StorageError::ColumnNotFound {
1639 column: column_name.into(),
1640 }
1641 })?;
1642 let mut idx = Index::new_gin(name, column_position);
1643 idx.kind = IndexKind::Gin(map);
1644 self.indices.push(idx);
1645 Ok(())
1646 }
1647
1648 pub fn add_gin_trgm_index(
1653 &mut self,
1654 name: String,
1655 column_name: &str,
1656 ) -> Result<(), StorageError> {
1657 if self.indices.iter().any(|i| i.name == name) {
1658 return Err(StorageError::DuplicateIndex { name });
1659 }
1660 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1661 StorageError::ColumnNotFound {
1662 column: column_name.into(),
1663 }
1664 })?;
1665 if !matches!(
1666 self.schema.columns[column_position].ty,
1667 DataType::Text | DataType::Varchar(_)
1668 ) {
1669 return Err(StorageError::Corrupt(format!(
1670 "trigram-GIN index {name:?} requires a TEXT/VARCHAR column; \
1671 {column_name:?} is {:?}",
1672 self.schema.columns[column_position].ty
1673 )));
1674 }
1675 let mut idx = Index::new_gin_trgm(name, column_position);
1676 if let IndexKind::GinTrgm(map) = &mut idx.kind {
1677 for (i, row) in self.rows.iter().enumerate() {
1678 if let Value::Text(s) = &row.values[column_position] {
1679 for tri in trgm::extract_trigrams(s) {
1680 let mut entries = map.get(&tri).cloned().unwrap_or_default();
1681 entries.push(RowLocator::Hot(i));
1682 map.insert_mut(tri, entries);
1683 }
1684 }
1685 }
1686 }
1687 self.indices.push(idx);
1688 Ok(())
1689 }
1690
1691 pub fn restore_gin_trgm_index(
1694 &mut self,
1695 name: String,
1696 column_name: &str,
1697 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1698 ) -> Result<(), StorageError> {
1699 if self.indices.iter().any(|i| i.name == name) {
1700 return Err(StorageError::DuplicateIndex { name });
1701 }
1702 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1703 StorageError::ColumnNotFound {
1704 column: column_name.into(),
1705 }
1706 })?;
1707 let mut idx = Index::new_gin_trgm(name, column_position);
1708 idx.kind = IndexKind::GinTrgm(map);
1709 self.indices.push(idx);
1710 Ok(())
1711 }
1712
1713 pub fn register_cold_locators<I>(
1730 &mut self,
1731 index_name: &str,
1732 locators: I,
1733 ) -> Result<usize, StorageError>
1734 where
1735 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1736 {
1737 let idx = self
1738 .indices
1739 .iter_mut()
1740 .find(|i| i.name == index_name)
1741 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1742 let map = match &mut idx.kind {
1743 IndexKind::BTree(map) => map,
1744 IndexKind::Nsw(_)
1745 | IndexKind::Brin { .. }
1746 | IndexKind::Gin(_)
1747 | IndexKind::GinTrgm(_) => {
1748 return Err(StorageError::Corrupt(format!(
1749 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1750 )));
1751 }
1752 };
1753 let mut count = 0usize;
1754 for (key, locator) in locators {
1755 let mut entries = map.get(&key).cloned().unwrap_or_default();
1756 entries.push(locator);
1757 map.insert_mut(key, entries);
1758 count += 1;
1759 }
1760 Ok(count)
1761 }
1762
1763 pub fn register_gin_cold_locators<I>(
1770 &mut self,
1771 index_name: &str,
1772 locators: I,
1773 ) -> Result<usize, StorageError>
1774 where
1775 I: IntoIterator<Item = (String, RowLocator)>,
1776 {
1777 let idx = self
1778 .indices
1779 .iter_mut()
1780 .find(|i| i.name == index_name)
1781 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1782 let map = match &mut idx.kind {
1783 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => map,
1784 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1785 return Err(StorageError::Corrupt(format!(
1786 "register_gin_cold_locators: index {index_name:?} is not GIN"
1787 )));
1788 }
1789 };
1790 let mut count = 0usize;
1791 for (word, locator) in locators {
1792 let mut entries = map.get(&word).cloned().unwrap_or_default();
1793 entries.push(locator);
1794 map.insert_mut(word, entries);
1795 count += 1;
1796 }
1797 Ok(count)
1798 }
1799
1800 pub fn remove_cold_locators_for_key(
1810 &mut self,
1811 index_name: &str,
1812 key: &IndexKey,
1813 ) -> Result<usize, StorageError> {
1814 let idx = self
1815 .indices
1816 .iter_mut()
1817 .find(|i| i.name == index_name)
1818 .ok_or_else(|| {
1819 StorageError::Corrupt(format!(
1820 "remove_cold_locators_for_key: index {index_name:?} not found"
1821 ))
1822 })?;
1823 let map = match &mut idx.kind {
1824 IndexKind::BTree(map) => map,
1825 IndexKind::Nsw(_)
1826 | IndexKind::Brin { .. }
1827 | IndexKind::Gin(_)
1828 | IndexKind::GinTrgm(_) => {
1829 return Err(StorageError::Corrupt(format!(
1830 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1831 cold locators apply only to BTree indices"
1832 )));
1833 }
1834 };
1835 let Some(entries) = map.get(key) else {
1836 return Ok(0);
1837 };
1838 let mut kept: Vec<RowLocator> =
1839 entries.iter().copied().filter(RowLocator::is_hot).collect();
1840 let removed = entries.len() - kept.len();
1841 if removed == 0 {
1842 return Ok(0);
1843 }
1844 kept.shrink_to_fit();
1845 map.insert_mut(key.clone(), kept);
1853 Ok(removed)
1854 }
1855
1856 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1863 self.schema.columns.push(col);
1864 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1865 for row in self.rows.iter() {
1866 let mut values = row.values.clone();
1867 values.push(fill_value.clone());
1868 new_rows.push_mut(Row::new(values));
1869 }
1870 self.rows = new_rows;
1871 }
1872
1873 pub fn set_partial_predicate(&mut self, idx: usize, pred: Option<String>) {
1880 debug_assert!(idx < self.indices.len());
1881 self.indices[idx].partial_predicate = pred;
1882 }
1883
1884 pub fn rename_column(&mut self, col_pos: usize, new_name: &str) {
1895 debug_assert!(col_pos < self.schema.columns.len());
1896 self.schema.columns[col_pos].name = new_name.to_string();
1897 }
1898
1899 pub fn drop_column(&mut self, col_pos: usize) {
1909 debug_assert!(col_pos < self.schema.columns.len());
1910 self.schema.columns.remove(col_pos);
1912 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1914 for row in self.rows.iter() {
1915 let mut values = row.values.clone();
1916 if col_pos < values.len() {
1917 values.remove(col_pos);
1918 }
1919 new_rows.push_mut(Row::new(values));
1920 }
1921 self.rows = new_rows;
1922 self.indices.retain(|idx| idx.column_position != col_pos);
1924 for idx in &mut self.indices {
1925 if idx.column_position > col_pos {
1926 idx.column_position -= 1;
1927 }
1928 for inc in &mut idx.included_columns {
1930 if *inc as usize > col_pos {
1931 *inc -= 1;
1932 }
1933 }
1934 }
1935 let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
1940 for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
1941 uc.columns.retain(|&c| c != col_pos);
1942 if uc.columns.is_empty() {
1943 continue;
1944 }
1945 for c in &mut uc.columns {
1946 if *c > col_pos {
1947 *c -= 1;
1948 }
1949 }
1950 surviving_ucs.push(uc);
1951 }
1952 self.schema.uniqueness_constraints = surviving_ucs;
1953 for fk in &mut self.schema.foreign_keys {
1956 for c in &mut fk.local_columns {
1957 if *c > col_pos {
1958 *c -= 1;
1959 }
1960 }
1961 }
1962 self.rebuild_indices();
1969 }
1970
1971 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1977 if positions.is_empty() {
1978 return 0;
1979 }
1980 let mut to_remove = alloc::vec![false; self.rows.len()];
1984 let mut removed = 0;
1985 for &p in positions {
1986 if p < to_remove.len() && !to_remove[p] {
1987 to_remove[p] = true;
1988 removed += 1;
1989 }
1990 }
1991 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1992 let mut removed_bytes: u64 = 0;
1993 for (i, row) in self.rows.iter().enumerate() {
1994 if to_remove[i] {
1995 removed_bytes =
1996 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1997 } else {
1998 new_rows.push_mut(row.clone());
1999 }
2000 }
2001 self.rows = new_rows;
2002 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
2003 self.rebuild_indices();
2004 removed
2005 }
2006
2007 pub fn update_row(
2013 &mut self,
2014 position: usize,
2015 new_values: Vec<Value>,
2016 ) -> Result<(), StorageError> {
2017 if position >= self.rows.len() {
2018 return Err(StorageError::Corrupt(alloc::format!(
2019 "update_row: position {position} out of bounds (rows={})",
2020 self.rows.len()
2021 )));
2022 }
2023 if new_values.len() != self.schema.columns.len() {
2024 return Err(StorageError::ArityMismatch {
2025 expected: self.schema.columns.len(),
2026 actual: new_values.len(),
2027 });
2028 }
2029 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
2033 if val.is_null() {
2034 if !col.nullable {
2035 return Err(StorageError::NullInNotNull {
2036 column: col.name.clone(),
2037 });
2038 }
2039 continue;
2040 }
2041 let actual = val.data_type().expect("non-null");
2042 let compatible = actual == col.ty
2043 || matches!(
2044 (actual, col.ty),
2045 (
2046 DataType::Text,
2047 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
2048 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
2049 | (DataType::Json, DataType::Jsonb)
2050 | (DataType::Jsonb, DataType::Json)
2051 | (DataType::Timestamp, DataType::Timestamptz)
2052 | (DataType::Timestamptz, DataType::Timestamp)
2053 )
2054 || matches!(
2055 (actual, col.ty),
2056 (
2057 DataType::Numeric { scale: a, .. },
2058 DataType::Numeric { scale: b, .. },
2059 ) if a == b
2060 );
2061 if !compatible {
2062 return Err(StorageError::TypeMismatch {
2063 column: col.name.clone(),
2064 expected: col.ty,
2065 actual,
2066 position: i,
2067 });
2068 }
2069 }
2070 let old_row = self
2071 .rows
2072 .get(position)
2073 .expect("position bounds-checked above");
2074 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
2075 let new_row = Row::new(new_values);
2076 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
2077 self.rows = self
2078 .rows
2079 .set(position, new_row)
2080 .expect("position bounds-checked above");
2081 self.hot_bytes = self
2082 .hot_bytes
2083 .saturating_sub(old_bytes)
2084 .saturating_add(new_bytes);
2085 self.rebuild_indices();
2086 Ok(())
2087 }
2088
2089 fn rebuild_indices(&mut self) {
2096 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
2105 .indices
2106 .iter()
2107 .filter_map(|idx| match &idx.kind {
2108 IndexKind::BTree(map) => {
2109 let cold: Vec<(IndexKey, RowLocator)> = map
2110 .iter()
2111 .flat_map(|(k, locs)| {
2112 locs.iter()
2113 .filter(|l| l.is_cold())
2114 .copied()
2115 .map(move |l| (k.clone(), l))
2116 })
2117 .collect();
2118 if cold.is_empty() {
2119 None
2120 } else {
2121 Some((idx.name.clone(), cold))
2122 }
2123 }
2124 IndexKind::Nsw(_)
2127 | IndexKind::Brin { .. }
2128 | IndexKind::Gin(_)
2129 | IndexKind::GinTrgm(_) => None,
2130 })
2131 .collect();
2132
2133 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
2141 .indices
2142 .iter()
2143 .filter_map(|idx| match &idx.kind {
2144 IndexKind::Gin(map) | IndexKind::GinTrgm(map) => {
2145 let cold: Vec<(String, RowLocator)> = map
2146 .iter()
2147 .flat_map(|(w, locs)| {
2148 locs.iter()
2149 .filter(|l| l.is_cold())
2150 .copied()
2151 .map(move |l| (w.clone(), l))
2152 })
2153 .collect();
2154 if cold.is_empty() {
2155 None
2156 } else {
2157 Some((idx.name.clone(), cold))
2158 }
2159 }
2160 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
2161 })
2162 .collect();
2163
2164 #[derive(Clone)]
2169 enum RebuildKind {
2170 BTree,
2171 Nsw(usize),
2172 Brin(DataType),
2173 Gin,
2174 GinTrgm,
2175 }
2176 let descriptors: Vec<(String, usize, RebuildKind)> = self
2177 .indices
2178 .iter()
2179 .map(|idx| {
2180 let kind = match &idx.kind {
2181 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
2182 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
2183 IndexKind::BTree(_) => RebuildKind::BTree,
2184 IndexKind::Gin(_) => RebuildKind::Gin,
2185 IndexKind::GinTrgm(_) => RebuildKind::GinTrgm,
2186 };
2187 (idx.name.clone(), idx.column_position, kind)
2188 })
2189 .collect();
2190 self.indices.clear();
2191 for (name, column_position, rebuild_kind) in descriptors {
2192 match rebuild_kind {
2193 RebuildKind::Nsw(m) => {
2194 let idx = Index::new_nsw(name, column_position, m);
2195 self.indices.push(idx);
2196 let idx_pos = self.indices.len() - 1;
2197 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2198 for row_idx in row_indices {
2199 nsw_insert_at(self, idx_pos, row_idx);
2200 }
2201 }
2202 RebuildKind::Brin(column_type) => {
2203 self.indices
2206 .push(Index::new_brin(name, column_position, column_type));
2207 }
2208 RebuildKind::BTree => {
2209 let mut idx = Index::new_btree(name, column_position);
2210 if let IndexKind::BTree(map) = &mut idx.kind {
2211 for (i, row) in self.rows.iter().enumerate() {
2212 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
2213 let mut entries = map.get(&key).cloned().unwrap_or_default();
2214 entries.push(RowLocator::Hot(i));
2215 map.insert_mut(key, entries);
2216 }
2217 }
2218 }
2219 self.indices.push(idx);
2220 }
2221 RebuildKind::Gin => {
2222 let mut idx = Index::new_gin(name, column_position);
2223 if let IndexKind::Gin(map) = &mut idx.kind {
2224 for (i, row) in self.rows.iter().enumerate() {
2225 if let Value::TsVector(lexemes) = &row.values[column_position] {
2226 for lex in lexemes {
2227 let mut entries =
2228 map.get(&lex.word).cloned().unwrap_or_default();
2229 entries.push(RowLocator::Hot(i));
2230 map.insert_mut(lex.word.clone(), entries);
2231 }
2232 }
2233 }
2234 }
2235 self.indices.push(idx);
2236 }
2237 RebuildKind::GinTrgm => {
2238 let mut idx = Index::new_gin_trgm(name, column_position);
2239 if let IndexKind::GinTrgm(map) = &mut idx.kind {
2240 for (i, row) in self.rows.iter().enumerate() {
2241 if let Value::Text(s) = &row.values[column_position] {
2242 for tri in trgm::extract_trigrams(s) {
2243 let mut entries =
2244 map.get(&tri).cloned().unwrap_or_default();
2245 entries.push(RowLocator::Hot(i));
2246 map.insert_mut(tri, entries);
2247 }
2248 }
2249 }
2250 }
2251 self.indices.push(idx);
2252 }
2253 }
2254 }
2255
2256 for (idx_name, locators) in preserved_cold {
2261 let _ = self.register_cold_locators(&idx_name, locators);
2265 }
2266 for (idx_name, locators) in preserved_gin_cold {
2268 let _ = self.register_gin_cold_locators(&idx_name, locators);
2269 }
2270 }
2271
2272 fn add_nsw_index_inner(
2273 &mut self,
2274 name: String,
2275 column_name: &str,
2276 m: usize,
2277 restore: Option<NswGraph>,
2278 ) -> Result<(), StorageError> {
2279 if self.indices.iter().any(|i| i.name == name) {
2280 return Err(StorageError::DuplicateIndex { name });
2281 }
2282 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2283 StorageError::ColumnNotFound {
2284 column: column_name.into(),
2285 }
2286 })?;
2287 if !matches!(
2288 self.schema.columns[column_position].ty,
2289 DataType::Vector { .. }
2290 ) {
2291 return Err(StorageError::TypeMismatch {
2292 column: column_name.into(),
2293 expected: DataType::Vector {
2294 dim: 0,
2295 encoding: VecEncoding::F32,
2296 },
2297 actual: self.schema.columns[column_position].ty,
2298 position: column_position,
2299 });
2300 }
2301 if let Some(graph) = restore {
2302 self.indices.push(Index {
2303 name,
2304 column_position,
2305 kind: IndexKind::Nsw(graph),
2306 included_columns: Vec::new(),
2307 partial_predicate: None,
2308 expression: None,
2309 is_unique: false,
2310 extra_column_positions: Vec::new(),
2311 });
2312 return Ok(());
2313 }
2314 let idx = Index::new_nsw(name, column_position, m);
2315 self.indices.push(idx);
2316 let idx_pos = self.indices.len() - 1;
2317 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2320 for row_idx in row_indices {
2321 nsw_insert_at(self, idx_pos, row_idx);
2322 }
2323 Ok(())
2324 }
2325}
2326
2327fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2334 if matches!(cell, Value::Null) {
2335 return Ok(cell);
2336 }
2337 let as_f32: Vec<f32> = match &cell {
2339 Value::Vector(v) => v.clone(),
2340 Value::Sq8Vector(q) => quantize::dequantize(q),
2341 Value::HalfVector(h) => h.to_f32_vec(),
2342 other => {
2343 return Err(StorageError::Unsupported(format!(
2344 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2345 other.data_type()
2346 )));
2347 }
2348 };
2349 Ok(match target {
2354 VecEncoding::F32 => Value::Vector(as_f32),
2355 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2356 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2357 })
2358}
2359
2360fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2367 let col_pos = table.indices[idx_pos].column_position;
2368 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2369 Value::Vector(v) => Some(v.len()),
2370 Value::Sq8Vector(q) => Some(q.bytes.len()),
2371 Value::HalfVector(h) => Some(h.dim()),
2372 _ => None,
2373 };
2374 let Some(dim) = cell_dim else {
2375 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2378 return;
2379 };
2380 if dim == 0 {
2381 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2382 return;
2383 }
2384 let level = nsw_assign_level(new_row_idx);
2385 ensure_node_slot(table, idx_pos, new_row_idx, level);
2386 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2387 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2388 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
2389 unreachable!("nsw_insert_at on a non-NSW index")
2390 }
2391 };
2392 if entry.is_none() {
2394 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2395 g.entry = Some(new_row_idx);
2396 g.entry_level = level;
2397 *g.levels
2398 .get_mut(new_row_idx)
2399 .expect("levels slot padded by ensure_node_slot") = level;
2400 }
2401 return;
2402 }
2403 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2405 *g.levels
2406 .get_mut(new_row_idx)
2407 .expect("levels slot padded by ensure_node_slot") = level;
2408 }
2409 let query = match &table.rows[new_row_idx].values[col_pos] {
2410 Value::Vector(v) => v.clone(),
2411 Value::Sq8Vector(q) => quantize::dequantize(q),
2417 Value::HalfVector(h) => h.to_f32_vec(),
2420 _ => return,
2421 };
2422 let mut current = entry.expect("entry was Some above");
2425 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2426 if entry_level > level {
2427 for layer in (level + 1..=entry_level).rev() {
2428 (current, current_d) =
2429 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2430 }
2431 }
2432 let top = level.min(entry_level);
2436 let ef = (m * 2).max(8);
2437 for layer in (0..=top).rev() {
2438 let cap = if layer == 0 { m * 2 } else { m };
2439 let mut candidates = layer_beam_search(
2440 table,
2441 idx_pos,
2442 layer,
2443 current,
2444 current_d,
2445 &query,
2446 ef,
2447 NswMetric::L2,
2448 );
2449 candidates.retain(|&(_, n)| n != new_row_idx);
2450 if let Some(&(d, n)) = candidates.first() {
2453 current = n;
2454 current_d = d;
2455 }
2456 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2457 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2458 }
2459 if level > entry_level
2462 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2463 {
2464 g.entry = Some(new_row_idx);
2465 g.entry_level = level;
2466 }
2467}
2468
2469fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2473 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2474 unreachable!("ensure_node_slot on a BTree index");
2475 };
2476 while g.layers.len() <= level as usize {
2477 g.layers.push(PersistentVec::new());
2478 }
2479 while g.levels.len() <= new_row_idx {
2480 g.levels.push_mut(0);
2481 }
2482 for layer_vec in &mut g.layers {
2483 while layer_vec.len() <= new_row_idx {
2484 layer_vec.push_mut(Vec::new());
2485 }
2486 }
2487}
2488
2489fn greedy_layer_walk(
2495 table: &Table,
2496 idx_pos: usize,
2497 layer: u8,
2498 mut current: usize,
2499 mut current_d: f32,
2500 query: &[f32],
2501) -> (usize, f32) {
2502 let g = match &table.indices[idx_pos].kind {
2503 IndexKind::Nsw(g) => g,
2504 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
2505 return (current, current_d);
2506 }
2507 };
2508 let col_pos = table.indices[idx_pos].column_position;
2509 loop {
2510 let neighbours: &[u32] = g
2511 .layers
2512 .get(layer as usize)
2513 .and_then(|layer_v| layer_v.get(current))
2514 .map_or(&[][..], Vec::as_slice);
2515 let mut best = current;
2516 let mut best_d = current_d;
2517 for &n in neighbours {
2518 let n = n as usize;
2519 let d = vec_l2_sq(table, col_pos, n, query);
2520 if d < best_d {
2521 best = n;
2522 best_d = d;
2523 }
2524 }
2525 if best == current {
2526 return (current, current_d);
2527 }
2528 current = best;
2529 current_d = best_d;
2530 }
2531}
2532
2533#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2546 table: &Table,
2547 idx_pos: usize,
2548 layer: u8,
2549 entry_node: usize,
2550 entry_d: f32,
2551 query: &[f32],
2552 ef: usize,
2553 metric: NswMetric,
2554) -> Vec<(f32, usize)> {
2555 let g = match &table.indices[idx_pos].kind {
2556 IndexKind::Nsw(g) => g,
2557 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return Vec::new(),
2558 };
2559 let col_pos = table.indices[idx_pos].column_position;
2560 let d0 = if matches!(metric, NswMetric::L2) {
2561 entry_d
2562 } else {
2563 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2564 };
2565 let row_count = table.rows.len();
2566 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2567 if entry_node < row_count {
2568 visited[entry_node] = true;
2569 }
2570 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2573 alloc::collections::BinaryHeap::with_capacity(ef);
2574 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2575 alloc::collections::BinaryHeap::with_capacity(ef);
2576 candidates.push(NodeClosest {
2577 dist: d0,
2578 node: entry_node,
2579 });
2580 results.push(NodeFurthest {
2581 dist: d0,
2582 node: entry_node,
2583 });
2584 while let Some(cur) = candidates.pop() {
2585 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2586 if cur.dist > worst && results.len() >= ef {
2587 break;
2588 }
2589 let neighbours: &[u32] = g
2590 .layers
2591 .get(layer as usize)
2592 .and_then(|layer_v| layer_v.get(cur.node))
2593 .map_or(&[][..], Vec::as_slice);
2594 for &n in neighbours {
2595 let n = n as usize;
2596 if n >= row_count || visited[n] {
2597 continue;
2598 }
2599 visited[n] = true;
2600 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2604 if !dn.is_finite() {
2605 continue;
2606 }
2607 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2608 if results.len() < ef || dn < worst {
2609 results.push(NodeFurthest { dist: dn, node: n });
2610 if results.len() > ef {
2611 results.pop();
2612 }
2613 candidates.push(NodeClosest { dist: dn, node: n });
2614 }
2615 }
2616 }
2617 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2620 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2621 out
2622}
2623
2624#[derive(Debug, Clone, Copy)]
2628struct NodeClosest {
2629 dist: f32,
2630 node: usize,
2631}
2632impl PartialEq for NodeClosest {
2633 fn eq(&self, other: &Self) -> bool {
2634 self.dist == other.dist && self.node == other.node
2635 }
2636}
2637impl Eq for NodeClosest {}
2638impl PartialOrd for NodeClosest {
2639 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2640 Some(self.cmp(other))
2641 }
2642}
2643impl Ord for NodeClosest {
2644 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2645 other
2647 .dist
2648 .partial_cmp(&self.dist)
2649 .unwrap_or(core::cmp::Ordering::Equal)
2650 }
2651}
2652
2653#[derive(Debug, Clone, Copy)]
2656struct NodeFurthest {
2657 dist: f32,
2658 node: usize,
2659}
2660impl PartialEq for NodeFurthest {
2661 fn eq(&self, other: &Self) -> bool {
2662 self.dist == other.dist && self.node == other.node
2663 }
2664}
2665impl Eq for NodeFurthest {}
2666impl PartialOrd for NodeFurthest {
2667 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2668 Some(self.cmp(other))
2669 }
2670}
2671impl Ord for NodeFurthest {
2672 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2673 self.dist
2674 .partial_cmp(&other.dist)
2675 .unwrap_or(core::cmp::Ordering::Equal)
2676 }
2677}
2678
2679fn select_neighbours_heuristic(
2688 candidates: &[(f32, usize)],
2689 m: usize,
2690 table: &Table,
2691 col_pos: usize,
2692) -> Vec<usize> {
2693 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2694 for &(d_q, e) in candidates {
2695 if chosen.len() >= m {
2696 break;
2697 }
2698 if !matches!(
2703 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2704 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2705 ) {
2706 continue;
2707 }
2708 let mut covered = false;
2709 for &r in &chosen {
2710 if cell_l2_sq(table, col_pos, e, r) < d_q {
2714 covered = true;
2715 break;
2716 }
2717 }
2718 if !covered {
2719 chosen.push(e);
2720 }
2721 }
2722 chosen
2723}
2724
2725fn connect_at_layer(
2729 table: &mut Table,
2730 idx_pos: usize,
2731 layer: u8,
2732 new_row_idx: usize,
2733 peers: &[usize],
2734) {
2735 let col_pos = table.indices[idx_pos].column_position;
2736 let cap = match &table.indices[idx_pos].kind {
2737 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2738 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return,
2739 };
2740 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2745 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2746 let layer_v = &mut g.layers[layer as usize];
2747 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2748 *slot = peers
2749 .iter()
2750 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2751 .collect();
2752 }
2753 }
2754 for &peer in peers {
2755 if !matches!(
2759 &table.rows[peer].values[col_pos],
2760 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2761 ) {
2762 continue;
2763 }
2764 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2766 let layer_v = &mut g.layers[layer as usize];
2767 if let Some(slot) = layer_v.get_mut(peer)
2768 && !slot.contains(&new_row_u32)
2769 {
2770 slot.push(new_row_u32);
2771 }
2772 }
2773 let needs_trim = match &table.indices[idx_pos].kind {
2777 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2778 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => false,
2779 };
2780 if needs_trim {
2781 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2782 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2783 .iter()
2784 .map(|&n| n as usize)
2785 .collect(),
2786 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => continue,
2787 };
2788 let mut tagged: Vec<(f32, usize)> = current_peers
2793 .iter()
2794 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2795 .collect();
2796 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2797 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2798 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2799 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2800 {
2801 *slot = kept
2802 .into_iter()
2803 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2804 .collect();
2805 }
2806 }
2807 }
2808}
2809
2810fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2817 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2818 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2819 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2820 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2821 }
2822 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2826 halfvec::half_l2_distance_sq_asymmetric(h, query)
2827 }
2828 _ => f32::INFINITY,
2829 }
2830}
2831
2832fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2839 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2840 return f32::INFINITY;
2841 };
2842 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2843 return f32::INFINITY;
2844 };
2845 match (cell_a, cell_b) {
2846 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2847 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2848 quantize::sq8_l2_distance_sq(a, b)
2849 }
2850 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2855 halfvec::half_l2_distance_sq(a, b)
2856 }
2857 _ => f32::INFINITY,
2858 }
2859}
2860
2861fn cell_to_query_metric_distance(
2866 table: &Table,
2867 col_pos: usize,
2868 row: usize,
2869 query: &[f32],
2870 metric: NswMetric,
2871) -> f32 {
2872 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2873 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2874 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2875 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2876 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2877 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2878 },
2879 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2882 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2883 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2884 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2885 },
2886 _ => f32::INFINITY,
2887 }
2888}
2889
2890#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2896pub enum NswMetric {
2897 L2,
2900 InnerProduct,
2903 Cosine,
2906}
2907
2908fn nsw_search(
2914 table: &Table,
2915 idx_pos: usize,
2916 query: &[f32],
2917 k: usize,
2918 ef: usize,
2919 metric: NswMetric,
2920) -> Vec<(f32, usize)> {
2921 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2922 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2923 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => return Vec::new(),
2924 };
2925 let Some(entry) = entry else {
2926 return Vec::new();
2927 };
2928 let col_pos = table.indices[idx_pos].column_position;
2929 let sq8 = matches!(
2936 table.schema.columns.get(col_pos).map(|c| c.ty),
2937 Some(DataType::Vector {
2938 encoding: VecEncoding::Sq8,
2939 ..
2940 })
2941 );
2942 let ef = if sq8 {
2943 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2944 } else {
2945 ef.max(k)
2946 };
2947 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2949 let mut current = entry;
2950 let mut current_d = entry_d;
2951 for layer in (1..=entry_level).rev() {
2952 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2953 }
2954 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2956 if sq8 {
2957 results = sq8_rerank(table, col_pos, &results, query, metric);
2958 }
2959 results.truncate(k);
2960 results
2961}
2962
2963fn sq8_rerank(
2970 table: &Table,
2971 col_pos: usize,
2972 candidates: &[(f32, usize)],
2973 query: &[f32],
2974 metric: NswMetric,
2975) -> Vec<(f32, usize)> {
2976 let mut out: Vec<(f32, usize)> = candidates
2977 .iter()
2978 .filter_map(|&(adc_d, row)| {
2979 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2980 let Value::Sq8Vector(q) = cell else {
2981 return Some((adc_d, row));
2985 };
2986 let deq = quantize::dequantize(q);
2987 if deq.len() != query.len() {
2988 return None;
2989 }
2990 Some((metric_distance(metric, &deq, query), row))
2991 })
2992 .collect();
2993 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2994 out
2995}
2996
2997const SQ8_RERANK_OVER_FETCH: usize = 3;
3001
3002fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
3003 match metric {
3004 NswMetric::L2 => l2_distance_sq(a, b),
3005 NswMetric::InnerProduct => -inner_product_f32(a, b),
3006 NswMetric::Cosine => {
3007 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
3008 if na == 0.0 || nb == 0.0 {
3009 return f32::INFINITY;
3010 }
3011 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
3014 1.0 - dot / denom
3015 }
3016 }
3017}
3018
3019#[doc(hidden)]
3028#[inline]
3029pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
3030 #[cfg(target_arch = "aarch64")]
3031 {
3032 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3033 return unsafe { inner_product_neon(a, b) };
3036 }
3037 }
3038 inner_product_scalar(a, b)
3039}
3040
3041fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
3042 let mut dot: f32 = 0.0;
3043 for (x, y) in a.iter().zip(b.iter()) {
3044 dot += x * y;
3045 }
3046 dot
3047}
3048
3049#[cfg(target_arch = "aarch64")]
3050#[target_feature(enable = "neon")]
3051#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
3053 use core::arch::aarch64::{
3054 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
3055 };
3056 unsafe {
3057 let zero: float32x4_t = vdupq_n_f32(0.0);
3060 let mut acc0 = zero;
3061 let mut acc1 = zero;
3062 let n = a.len();
3063 let mut i = 0usize;
3064 while i + 8 <= n {
3065 let av0 = vld1q_f32(a.as_ptr().add(i));
3066 let bv0 = vld1q_f32(b.as_ptr().add(i));
3067 acc0 = vfmaq_f32(acc0, av0, bv0);
3068 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
3069 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
3070 acc1 = vfmaq_f32(acc1, av1, bv1);
3071 i += 8;
3072 }
3073 while i + 4 <= n {
3074 let av = vld1q_f32(a.as_ptr().add(i));
3075 let bv = vld1q_f32(b.as_ptr().add(i));
3076 acc0 = vfmaq_f32(acc0, av, bv);
3077 i += 4;
3078 }
3079 vaddvq_f32(vaddq_f32(acc0, acc1))
3080 }
3081}
3082
3083#[doc(hidden)]
3090#[inline]
3091pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3092 #[cfg(target_arch = "aarch64")]
3093 {
3094 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3095 return unsafe { cosine_dot_norms_neon(a, b) };
3097 }
3098 }
3099 cosine_dot_norms_scalar(a, b)
3100}
3101
3102fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3103 let mut dot: f32 = 0.0;
3104 let mut na: f32 = 0.0;
3105 let mut nb: f32 = 0.0;
3106 for (x, y) in a.iter().zip(b.iter()) {
3107 dot += x * y;
3108 na += x * x;
3109 nb += y * y;
3110 }
3111 (dot, na, nb)
3112}
3113
3114#[cfg(target_arch = "aarch64")]
3115#[target_feature(enable = "neon")]
3116#[allow(clippy::many_single_char_names, clippy::similar_names)]
3117unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
3118 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
3119 unsafe {
3120 let zero: float32x4_t = vdupq_n_f32(0.0);
3121 let mut acc_dot = zero;
3122 let mut acc_na = zero;
3123 let mut acc_nb = zero;
3124 let n = a.len();
3125 let mut i = 0usize;
3126 while i + 4 <= n {
3127 let av = vld1q_f32(a.as_ptr().add(i));
3128 let bv = vld1q_f32(b.as_ptr().add(i));
3129 acc_dot = vfmaq_f32(acc_dot, av, bv);
3130 acc_na = vfmaq_f32(acc_na, av, av);
3131 acc_nb = vfmaq_f32(acc_nb, bv, bv);
3132 i += 4;
3133 }
3134 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
3135 }
3136}
3137
3138fn sqrt_newton_f32(x: f32) -> f32 {
3139 if x <= 0.0 {
3140 return 0.0;
3141 }
3142 let mut g = x;
3143 for _ in 0..10 {
3144 g = 0.5 * (g + x / g);
3145 }
3146 g
3147}
3148
3149#[inline]
3157fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
3158 #[cfg(target_arch = "aarch64")]
3159 {
3160 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
3161 return unsafe { l2_distance_sq_neon(a, b) };
3165 }
3166 }
3167 l2_distance_sq_scalar(a, b)
3168}
3169
3170fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
3171 let mut sum: f32 = 0.0;
3172 for (x, y) in a.iter().zip(b.iter()) {
3173 let d = *x - *y;
3174 sum += d * d;
3175 }
3176 sum
3177}
3178
3179#[cfg(target_arch = "aarch64")]
3180#[target_feature(enable = "neon")]
3181#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
3183 use core::arch::aarch64::{
3184 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
3185 };
3186 unsafe {
3187 let zero: float32x4_t = vdupq_n_f32(0.0);
3192 let mut acc0 = zero;
3193 let mut acc1 = zero;
3194 let n = a.len();
3195 let mut i = 0usize;
3196 while i + 8 <= n {
3199 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3200 acc0 = vfmaq_f32(acc0, d0, d0);
3201 let d1 = vsubq_f32(
3202 vld1q_f32(a.as_ptr().add(i + 4)),
3203 vld1q_f32(b.as_ptr().add(i + 4)),
3204 );
3205 acc1 = vfmaq_f32(acc1, d1, d1);
3206 i += 8;
3207 }
3208 while i + 4 <= n {
3209 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3210 acc0 = vfmaq_f32(acc0, d, d);
3211 i += 4;
3212 }
3213 vaddvq_f32(vaddq_f32(acc0, acc1))
3214 }
3215}
3216
3217pub fn nsw_query(
3220 table: &Table,
3221 idx_name: &str,
3222 query: &[f32],
3223 k: usize,
3224 metric: NswMetric,
3225) -> Vec<usize> {
3226 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
3227 return Vec::new();
3228 };
3229 let ef = (k * 2).max(NSW_DEFAULT_M);
3230 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
3231 hits.truncate(k);
3232 hits.into_iter().map(|(_, idx)| idx).collect()
3233}
3234
3235pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
3239 table
3240 .indices
3241 .iter()
3242 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
3243}
3244
3245#[derive(Debug, Clone, Default)]
3257pub struct Catalog {
3258 tables: Vec<Table>,
3259 by_name: BTreeMap<String, usize>,
3262 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3284 functions: BTreeMap<String, FunctionDef>,
3291 triggers: Vec<TriggerDef>,
3296}
3297
3298#[derive(Debug, Clone, PartialEq, Eq)]
3304pub struct FunctionDef {
3305 pub name: String,
3306 pub args_repr: String,
3310 pub returns: String,
3315 pub language: String,
3317 pub body: String,
3322}
3323
3324#[derive(Debug, Clone, PartialEq, Eq)]
3328pub struct TriggerDef {
3329 pub name: String,
3330 pub table: String,
3332 pub timing: String,
3336 pub events: Vec<String>,
3339 pub for_each: String,
3343 pub function: String,
3345 pub update_columns: Vec<String>,
3352}
3353
3354impl Catalog {
3355 pub const fn new() -> Self {
3356 Self {
3357 tables: Vec::new(),
3358 by_name: BTreeMap::new(),
3359 cold_segments: Vec::new(),
3360 functions: BTreeMap::new(),
3361 triggers: Vec::new(),
3362 }
3363 }
3364
3365 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3369 &self.functions
3370 }
3371
3372 pub fn create_function(
3376 &mut self,
3377 def: FunctionDef,
3378 or_replace: bool,
3379 ) -> Result<(), StorageError> {
3380 if !or_replace && self.functions.contains_key(&def.name) {
3381 return Err(StorageError::Corrupt(format!(
3382 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3383 def.name
3384 )));
3385 }
3386 self.functions.insert(def.name.clone(), def);
3387 Ok(())
3388 }
3389
3390 pub fn drop_function(&mut self, name: &str) -> bool {
3394 self.functions.remove(name).is_some()
3395 }
3396
3397 pub fn triggers(&self) -> &[TriggerDef] {
3401 &self.triggers
3402 }
3403
3404 pub fn triggers_mut(&mut self) -> &mut Vec<TriggerDef> {
3409 &mut self.triggers
3410 }
3411
3412 pub fn create_trigger(
3418 &mut self,
3419 def: TriggerDef,
3420 or_replace: bool,
3421 ) -> Result<(), StorageError> {
3422 if !self.by_name.contains_key(&def.table) {
3423 return Err(StorageError::TableNotFound {
3424 name: def.table.clone(),
3425 });
3426 }
3427 if !self.functions.contains_key(&def.function) {
3428 return Err(StorageError::Corrupt(format!(
3429 "trigger {:?} references unknown function {:?}",
3430 def.name, def.function
3431 )));
3432 }
3433 let dup = self
3434 .triggers
3435 .iter()
3436 .position(|t| t.name == def.name && t.table == def.table);
3437 match (dup, or_replace) {
3438 (Some(_), false) => Err(StorageError::Corrupt(format!(
3439 "trigger {:?} already exists on table {:?}",
3440 def.name, def.table
3441 ))),
3442 (Some(i), true) => {
3443 self.triggers[i] = def;
3444 Ok(())
3445 }
3446 (None, _) => {
3447 self.triggers.push(def);
3448 Ok(())
3449 }
3450 }
3451 }
3452
3453 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3456 let before = self.triggers.len();
3457 self.triggers
3458 .retain(|t| !(t.name == name && t.table == table));
3459 before != self.triggers.len()
3460 }
3461
3462 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3463 if self.by_name.contains_key(&schema.name) {
3464 return Err(StorageError::DuplicateTable {
3465 name: schema.name.clone(),
3466 });
3467 }
3468 let idx = self.tables.len();
3469 let name = schema.name.clone();
3470 self.tables.push(Table::new(schema));
3471 self.by_name.insert(name, idx);
3472 Ok(())
3473 }
3474
3475 pub fn get(&self, name: &str) -> Option<&Table> {
3476 let idx = *self.by_name.get(name)?;
3477 self.tables.get(idx)
3478 }
3479
3480 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3481 let idx = *self.by_name.get(name)?;
3482 self.tables.get_mut(idx)
3483 }
3484
3485 pub fn table_count(&self) -> usize {
3486 self.tables.len()
3487 }
3488
3489 pub fn drop_table(&mut self, name: &str) -> bool {
3495 let Some(idx) = self.by_name.remove(name) else {
3496 return false;
3497 };
3498 self.tables.swap_remove(idx);
3501 if idx < self.tables.len() {
3503 let moved_name = self.tables[idx].schema.name.clone();
3504 self.by_name.insert(moved_name, idx);
3505 }
3506 true
3507 }
3508
3509 pub fn drop_named_index(&mut self, name: &str) -> bool {
3512 for t in &mut self.tables {
3513 let before = t.indices.len();
3514 t.indices.retain(|i| i.name != name);
3515 if t.indices.len() != before {
3516 return true;
3517 }
3518 }
3519 false
3520 }
3521
3522 pub fn table_names(&self) -> Vec<String> {
3525 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3526 }
3527
3528 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3539 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3540 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3541 })?;
3542 let seg = OwnedSegment::from_bytes(bytes)
3543 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3544 self.cold_segments.push(Some(Arc::new(seg)));
3545 Ok(id)
3546 }
3547
3548 pub fn load_segment_bytes_at(
3561 &mut self,
3562 target_id: u32,
3563 bytes: Vec<u8>,
3564 ) -> Result<(), StorageError> {
3565 let seg = OwnedSegment::from_bytes(bytes)
3566 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3567 let idx = target_id as usize;
3568 while self.cold_segments.len() <= idx {
3569 self.cold_segments.push(None);
3570 }
3571 if self.cold_segments[idx].is_some() {
3572 return Err(StorageError::Corrupt(format!(
3573 "load_segment_bytes_at: segment_id {target_id} already occupied"
3574 )));
3575 }
3576 self.cold_segments[idx] = Some(Arc::new(seg));
3577 Ok(())
3578 }
3579
3580 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3590 let idx = segment_id as usize;
3591 if idx >= self.cold_segments.len() {
3592 return Err(StorageError::Corrupt(format!(
3593 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3594 self.cold_segments.len()
3595 )));
3596 }
3597 self.cold_segments[idx] = None;
3598 Ok(())
3599 }
3600
3601 #[must_use]
3603 pub fn cold_segment_count(&self) -> usize {
3604 self.cold_segments.iter().filter(|s| s.is_some()).count()
3605 }
3606
3607 #[must_use]
3610 pub fn cold_segment_slot_count(&self) -> usize {
3611 self.cold_segments.len()
3612 }
3613
3614 #[must_use]
3619 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3620 self.cold_segments
3621 .iter()
3622 .enumerate()
3623 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3624 .collect()
3625 }
3626
3627 #[must_use]
3634 pub fn hot_tier_bytes(&self) -> u64 {
3635 self.tables
3636 .iter()
3637 .map(Table::hot_bytes)
3638 .fold(0u64, u64::saturating_add)
3639 }
3640
3641 pub fn freeze_oldest_to_cold(
3686 &mut self,
3687 table_name: &str,
3688 index_name: &str,
3689 max_rows: usize,
3690 ) -> Result<FreezeReport, StorageError> {
3691 if max_rows == 0 {
3693 return Err(StorageError::Corrupt(
3694 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3695 ));
3696 }
3697 let table = self.get(table_name).ok_or_else(|| {
3698 StorageError::Corrupt(format!(
3699 "freeze_oldest_to_cold: table {table_name:?} not found"
3700 ))
3701 })?;
3702 if max_rows > table.rows.len() {
3703 return Err(StorageError::Corrupt(format!(
3704 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3705 table.rows.len()
3706 )));
3707 }
3708 let idx = table
3709 .indices
3710 .iter()
3711 .find(|i| i.name == index_name)
3712 .ok_or_else(|| {
3713 StorageError::Corrupt(format!(
3714 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3715 ))
3716 })?;
3717 if !matches!(idx.kind, IndexKind::BTree(_)) {
3718 return Err(StorageError::Corrupt(format!(
3719 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3720 )));
3721 }
3722 let column_position = idx.column_position;
3723
3724 let schema = table.schema.clone();
3726 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3727 for row_idx in 0..max_rows {
3728 let row = table.rows.get(row_idx).expect("bounds-checked above");
3729 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3730 StorageError::Corrupt(format!(
3731 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3732 ))
3733 })?;
3734 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3735 StorageError::Corrupt(format!(
3736 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3737 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3738 ))
3739 })?;
3740 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3741 }
3742 to_freeze.sort_by_key(|(k, _, _)| *k);
3747 for w in to_freeze.windows(2) {
3751 if w[0].0 == w[1].0 {
3752 return Err(StorageError::Corrupt(format!(
3753 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3754 w[0].0
3755 )));
3756 }
3757 }
3758 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3762 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3766 .into_iter()
3767 .map(|(k, body, _)| (k, body))
3768 .collect();
3769 let frozen_rows = seg_rows.len();
3770 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3771 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3772
3773 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3782 let positions: Vec<usize> = (0..max_rows).collect();
3783 let t_mut = self
3784 .get_mut(table_name)
3785 .expect("just validated; still present");
3786 let removed = t_mut.delete_rows(&positions);
3787 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3788 let bytes_after = t_mut.hot_bytes();
3789 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3790
3791 let segment_id = self
3792 .load_segment_bytes(seg_bytes.clone())
3793 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3794 let new_cold = post_swap_keys.into_iter().map(|k| {
3795 (
3796 k,
3797 RowLocator::Cold {
3798 segment_id,
3799 page_offset: 0,
3800 },
3801 )
3802 });
3803 let t_mut = self.get_mut(table_name).expect("still present");
3804 t_mut.register_cold_locators(index_name, new_cold)?;
3805
3806 Ok(FreezeReport {
3807 segment_id,
3808 frozen_rows,
3809 bytes_freed,
3810 segment_bytes: seg_bytes,
3811 })
3812 }
3813
3814 #[must_use]
3820 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3821 self.cold_segments
3822 .get(segment_id as usize)
3823 .and_then(|s| s.as_deref())
3824 }
3825
3826 pub fn resolve_cold_locator(
3835 &self,
3836 table_name: &str,
3837 segment_id: u32,
3838 key: &IndexKey,
3839 ) -> Option<Row> {
3840 let t = self.get(table_name)?;
3841 let u64_key = index_key_as_u64(key)?;
3842 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3843 let payload = seg.lookup(u64_key)?;
3844 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3845 Some(row)
3846 }
3847
3848 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3866 let t = self.get(table)?;
3867 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3868 let locators = idx.lookup_eq(key);
3869 let cold_u64_key = index_key_as_u64(key);
3870 for loc in locators {
3871 match *loc {
3872 RowLocator::Hot(i) => {
3873 if let Some(row) = t.rows.get(i) {
3874 return Some(row.clone());
3875 }
3876 }
3877 RowLocator::Cold {
3878 segment_id,
3879 page_offset: _,
3880 } => {
3881 let Some(u64_key) = cold_u64_key else {
3882 continue;
3885 };
3886 let Some(seg) = self
3887 .cold_segments
3888 .get(segment_id as usize)
3889 .and_then(|s| s.as_deref())
3890 else {
3891 continue;
3902 };
3903 let Some(payload) = seg.lookup(u64_key) else {
3904 continue;
3905 };
3906 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3907 return Some(row);
3908 }
3909 }
3910 }
3911 None
3912 }
3913
3914 pub fn promote_cold_row(
3936 &mut self,
3937 table_name: &str,
3938 index_name: &str,
3939 key: &IndexKey,
3940 ) -> Result<Option<usize>, StorageError> {
3941 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3942 let Some((segment_id, _page_offset)) = cold_loc else {
3943 return Ok(None);
3944 };
3945 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3946 StorageError::Corrupt(
3947 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3948 .into(),
3949 )
3950 })?;
3951 let schema = self
3955 .get(table_name)
3956 .ok_or_else(|| {
3957 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3958 })?
3959 .schema
3960 .clone();
3961 let seg = self
3962 .cold_segments
3963 .get(segment_id as usize)
3964 .and_then(|s| s.as_ref())
3965 .ok_or_else(|| {
3966 StorageError::Corrupt(format!(
3967 "promote_cold_row: segment {segment_id} not registered on catalog"
3968 ))
3969 })?;
3970 let payload = seg.lookup(u64_key).ok_or_else(|| {
3971 StorageError::Corrupt(format!(
3972 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3973 but the segment's bloom/page lookup didn't return a row"
3974 ))
3975 })?;
3976 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3977 let t = self
3982 .get_mut(table_name)
3983 .expect("table existed at lookup time");
3984 t.insert(row)?;
3985 let new_hot_idx =
3986 t.rows.len().checked_sub(1).ok_or_else(|| {
3987 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3988 })?;
3989 t.remove_cold_locators_for_key(index_name, key)?;
3993 Ok(Some(new_hot_idx))
3994 }
3995
3996 pub fn shadow_cold_row(
4014 &mut self,
4015 table_name: &str,
4016 index_name: &str,
4017 key: &IndexKey,
4018 ) -> Result<usize, StorageError> {
4019 let t = self.get_mut(table_name).ok_or_else(|| {
4020 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
4021 })?;
4022 t.remove_cold_locators_for_key(index_name, key)
4023 }
4024
4025 pub fn prepare_freeze_slice(
4043 &self,
4044 table_name: &str,
4045 index_name: &str,
4046 row_range: core::ops::Range<usize>,
4047 ) -> Result<FreezeSlice, StorageError> {
4048 let table = self.get(table_name).ok_or_else(|| {
4049 StorageError::Corrupt(format!(
4050 "prepare_freeze_slice: table {table_name:?} not found"
4051 ))
4052 })?;
4053 let idx = table
4054 .indices
4055 .iter()
4056 .find(|i| i.name == index_name)
4057 .ok_or_else(|| {
4058 StorageError::Corrupt(format!(
4059 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
4060 ))
4061 })?;
4062 if !matches!(idx.kind, IndexKind::BTree(_)) {
4063 return Err(StorageError::Corrupt(format!(
4064 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
4065 )));
4066 }
4067 if row_range.end > table.rows.len() {
4068 return Err(StorageError::Corrupt(format!(
4069 "prepare_freeze_slice: row_range end {} > row_count {}",
4070 row_range.end,
4071 table.rows.len()
4072 )));
4073 }
4074 let column_position = idx.column_position;
4075 let schema = table.schema.clone();
4076 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
4077 for row_idx in row_range.clone() {
4078 let row = table.rows.get(row_idx).expect("bounds-checked above");
4079 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
4080 StorageError::Corrupt(format!(
4081 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
4082 ))
4083 })?;
4084 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
4085 StorageError::Corrupt(format!(
4086 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
4087 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4088 ))
4089 })?;
4090 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
4091 }
4092 rows.sort_by_key(|(k, _, _)| *k);
4093 Ok(FreezeSlice { row_range, rows })
4094 }
4095
4096 pub fn commit_freeze_slices(
4110 &mut self,
4111 table_name: &str,
4112 index_name: &str,
4113 slices: Vec<FreezeSlice>,
4114 ) -> Result<FreezeReport, StorageError> {
4115 let table = self.get(table_name).ok_or_else(|| {
4117 StorageError::Corrupt(format!(
4118 "commit_freeze_slices: table {table_name:?} not found"
4119 ))
4120 })?;
4121 let idx = table
4122 .indices
4123 .iter()
4124 .find(|i| i.name == index_name)
4125 .ok_or_else(|| {
4126 StorageError::Corrupt(format!(
4127 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
4128 ))
4129 })?;
4130 if !matches!(idx.kind, IndexKind::BTree(_)) {
4131 return Err(StorageError::Corrupt(format!(
4132 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
4133 )));
4134 }
4135 let mut ordered = slices;
4139 ordered.sort_by_key(|s| s.row_range.start);
4140 let mut expected_start = 0usize;
4144 for s in &ordered {
4145 if s.row_range.start != expected_start {
4146 return Err(StorageError::Corrupt(format!(
4147 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
4148 s.row_range.start, expected_start
4149 )));
4150 }
4151 expected_start = s.row_range.end;
4152 }
4153 let max_rows = expected_start;
4154 if max_rows > table.rows.len() {
4155 return Err(StorageError::Corrupt(format!(
4156 "commit_freeze_slices: total row range {} exceeds row_count {}",
4157 max_rows,
4158 table.rows.len()
4159 )));
4160 }
4161 if max_rows == 0 {
4162 return Ok(FreezeReport {
4163 segment_id: u32::MAX,
4164 frozen_rows: 0,
4165 bytes_freed: 0,
4166 segment_bytes: Vec::new(),
4167 });
4168 }
4169
4170 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
4175 if total_rows != max_rows {
4176 return Err(StorageError::Corrupt(format!(
4177 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
4178 )));
4179 }
4180 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
4181 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
4182 loop {
4183 let mut pick: Option<usize> = None;
4186 for (i, c) in cursors.iter().enumerate() {
4187 let slice = &ordered[i];
4188 if *c >= slice.rows.len() {
4189 continue;
4190 }
4191 match pick {
4192 None => pick = Some(i),
4193 Some(j) => {
4194 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
4195 pick = Some(i);
4196 }
4197 }
4198 }
4199 }
4200 let Some(i) = pick else { break };
4201 let row = ordered[i].rows[cursors[i]].clone();
4202 cursors[i] += 1;
4203 merged.push(row);
4204 }
4205 for w in merged.windows(2) {
4208 if w[0].0 == w[1].0 {
4209 return Err(StorageError::Corrupt(format!(
4210 "commit_freeze_slices: duplicate PK {} across slices",
4211 w[0].0
4212 )));
4213 }
4214 }
4215 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
4216 let seg_rows: Vec<(u64, Vec<u8>)> =
4217 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
4218 let frozen_rows = seg_rows.len();
4219 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4220 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
4221
4222 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
4224 let positions: Vec<usize> = (0..max_rows).collect();
4225 let t_mut = self
4226 .get_mut(table_name)
4227 .expect("just validated; still present");
4228 let removed = t_mut.delete_rows(&positions);
4229 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
4230 let bytes_after = t_mut.hot_bytes();
4231 let bytes_freed = bytes_before.saturating_sub(bytes_after);
4232
4233 let segment_id = self
4234 .load_segment_bytes(seg_bytes.clone())
4235 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
4236 let new_cold = post_swap_keys.into_iter().map(|k| {
4237 (
4238 k,
4239 RowLocator::Cold {
4240 segment_id,
4241 page_offset: 0,
4242 },
4243 )
4244 });
4245 let t_mut = self.get_mut(table_name).expect("still present");
4246 t_mut.register_cold_locators(index_name, new_cold)?;
4247
4248 Ok(FreezeReport {
4249 segment_id,
4250 frozen_rows,
4251 bytes_freed,
4252 segment_bytes: seg_bytes,
4253 })
4254 }
4255
4256 pub fn compact_cold_segments(
4299 &mut self,
4300 table_name: &str,
4301 index_name: &str,
4302 target_segment_bytes: u64,
4303 ) -> Result<CompactReport, StorageError> {
4304 let t = self.get(table_name).ok_or_else(|| {
4306 StorageError::Corrupt(format!(
4307 "compact_cold_segments: table {table_name:?} not found"
4308 ))
4309 })?;
4310 let idx = t
4311 .indices
4312 .iter()
4313 .find(|i| i.name == index_name)
4314 .ok_or_else(|| {
4315 StorageError::Corrupt(format!(
4316 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4317 ))
4318 })?;
4319 let map = match &idx.kind {
4320 IndexKind::BTree(m) => m,
4321 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
4322 return Err(StorageError::Corrupt(format!(
4323 "compact_cold_segments: index {index_name:?} is not BTree; \
4324 compaction applies only to BTree cold-tier indices"
4325 )));
4326 }
4327 };
4328
4329 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4332 for (_key, locators) in map.iter() {
4333 for loc in locators {
4334 if let RowLocator::Cold { segment_id, .. } = loc {
4335 referenced_ids.insert(*segment_id);
4336 }
4337 }
4338 }
4339 let candidate_set: BTreeSet<u32> = referenced_ids
4341 .into_iter()
4342 .filter(|id| {
4343 self.cold_segments
4344 .get(*id as usize)
4345 .and_then(|s| s.as_deref())
4346 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4347 })
4348 .collect();
4349 if candidate_set.len() < 2 {
4350 return Ok(CompactReport {
4351 sources: Vec::new(),
4352 merged_segment_id: None,
4353 merged_segment_bytes: Vec::new(),
4354 merged_rows: 0,
4355 deleted_rows_pruned: 0,
4356 bytes_reclaimed_estimate: 0,
4357 });
4358 }
4359 let mut source_row_count: usize = 0;
4361 let mut source_byte_total: u64 = 0;
4362 for &id in &candidate_set {
4363 let seg = self.cold_segments[id as usize]
4364 .as_ref()
4365 .expect("candidate selected only when slot is Some");
4366 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4367 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4368 }
4369 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4375 for (key, locators) in map.iter() {
4376 for loc in locators {
4377 let RowLocator::Cold { segment_id, .. } = loc else {
4378 continue;
4379 };
4380 if !candidate_set.contains(segment_id) {
4381 continue;
4382 }
4383 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4384 StorageError::Corrupt(format!(
4385 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4386 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4387 ))
4388 })?;
4389 let seg = self.cold_segments[*segment_id as usize]
4390 .as_ref()
4391 .expect("candidate slot guaranteed Some above");
4392 let payload = seg.lookup(u64_key).ok_or_else(|| {
4393 StorageError::Corrupt(format!(
4394 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4395 at segment {segment_id} but the segment lookup missed"
4396 ))
4397 })?;
4398 collected.insert(u64_key, (payload, key.clone()));
4399 break;
4400 }
4401 }
4402 let merged_rows = collected.len();
4403 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4404
4405 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4409 .iter()
4410 .map(|(k, (body, _))| (*k, body.clone()))
4411 .collect();
4412 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4413 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4414 let merged_bytes_len = seg_bytes.len() as u64;
4415
4416 let merged_segment_id = self
4418 .load_segment_bytes(seg_bytes.clone())
4419 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4420
4421 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4427 let t = self
4428 .get(table_name)
4429 .expect("table existed at the start of this fn");
4430 let idx = t
4431 .indices
4432 .iter()
4433 .find(|i| i.name == index_name)
4434 .expect("index existed at the start of this fn");
4435 let IndexKind::BTree(map) = &idx.kind else {
4436 unreachable!("validated above");
4437 };
4438 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4439 };
4440 let t_mut = self
4441 .get_mut(table_name)
4442 .expect("table existed at the start of this fn");
4443 let idx_mut = t_mut
4444 .indices
4445 .iter_mut()
4446 .find(|i| i.name == index_name)
4447 .expect("index existed at the start of this fn");
4448 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4449 unreachable!("validated above");
4450 };
4451 for (key, locators) in entries {
4452 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4453 let mut changed = false;
4454 for loc in &locators {
4455 match *loc {
4456 RowLocator::Cold {
4457 segment_id,
4458 page_offset: _,
4459 } if candidate_set.contains(&segment_id) => {
4460 let replacement = RowLocator::Cold {
4461 segment_id: merged_segment_id,
4462 page_offset: 0,
4463 };
4464 if !new_locs.contains(&replacement) {
4465 new_locs.push(replacement);
4466 }
4467 changed = true;
4468 }
4469 other => new_locs.push(other),
4470 }
4471 }
4472 if changed {
4473 map_mut.insert_mut(key, new_locs);
4474 }
4475 }
4476
4477 for &id in &candidate_set {
4482 self.tombstone_segment(id)?;
4483 }
4484
4485 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4486 Ok(CompactReport {
4487 sources: candidate_set.into_iter().collect(),
4488 merged_segment_id: Some(merged_segment_id),
4489 merged_segment_bytes: seg_bytes,
4490 merged_rows,
4491 deleted_rows_pruned,
4492 bytes_reclaimed_estimate,
4493 })
4494 }
4495
4496 fn find_cold_locator(
4502 &self,
4503 table_name: &str,
4504 index_name: &str,
4505 key: &IndexKey,
4506 ) -> Result<Option<(u32, u32)>, StorageError> {
4507 let t = self.get(table_name).ok_or_else(|| {
4508 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4509 })?;
4510 let idx = t
4511 .indices
4512 .iter()
4513 .find(|i| i.name == index_name)
4514 .ok_or_else(|| {
4515 StorageError::Corrupt(format!(
4516 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4517 ))
4518 })?;
4519 if !matches!(idx.kind, IndexKind::BTree(_)) {
4520 return Err(StorageError::Corrupt(format!(
4521 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4522 )));
4523 }
4524 for loc in idx.lookup_eq(key) {
4525 if let RowLocator::Cold {
4526 segment_id,
4527 page_offset,
4528 } = *loc
4529 {
4530 return Ok(Some((segment_id, page_offset)));
4531 }
4532 }
4533 Ok(None)
4534 }
4535}
4536
4537fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4543 match key {
4544 IndexKey::Int(n) => Some(n.cast_unsigned()),
4550 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4551 }
4552}
4553
4554#[derive(Debug, Clone, PartialEq, Eq)]
4555#[non_exhaustive]
4556pub enum StorageError {
4557 DuplicateTable {
4558 name: String,
4559 },
4560 TableNotFound {
4561 name: String,
4562 },
4563 ArityMismatch {
4564 expected: usize,
4565 actual: usize,
4566 },
4567 TypeMismatch {
4568 column: String,
4569 expected: DataType,
4570 actual: DataType,
4571 position: usize,
4572 },
4573 NullInNotNull {
4574 column: String,
4575 },
4576 DuplicateIndex {
4578 name: String,
4579 },
4580 ColumnNotFound {
4582 column: String,
4583 },
4584 Corrupt(String),
4587 IndexNotFound {
4590 name: String,
4591 },
4592 Unsupported(String),
4596}
4597
4598impl fmt::Display for StorageError {
4599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4600 match self {
4601 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4602 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4603 Self::ArityMismatch { expected, actual } => write!(
4604 f,
4605 "row arity mismatch: expected {expected} columns, got {actual}"
4606 ),
4607 Self::TypeMismatch {
4608 column,
4609 expected,
4610 actual,
4611 position,
4612 } => write!(
4613 f,
4614 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4615 ),
4616 Self::NullInNotNull { column } => {
4617 write!(f, "NULL value in NOT NULL column {column:?}")
4618 }
4619 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4620 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4621 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4622 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4623 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4624 }
4625 }
4626}
4627
4628impl ColumnSchema {
4629 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4630 Self {
4631 name: name.into(),
4632 ty,
4633 nullable,
4634 default: None,
4635 runtime_default: None,
4636 auto_increment: false,
4637 }
4638 }
4639
4640 #[must_use]
4644 pub fn with_default(mut self, default: Value) -> Self {
4645 self.default = Some(default);
4646 self
4647 }
4648
4649 #[must_use]
4654 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4655 self.runtime_default = Some(expr.into());
4656 self
4657 }
4658
4659 #[must_use]
4661 pub const fn with_auto_increment(mut self) -> Self {
4662 self.auto_increment = true;
4663 self
4664 }
4665}
4666
4667impl TableSchema {
4668 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4669 Self {
4670 name: name.into(),
4671 columns,
4672 hot_tier_bytes: None,
4673 foreign_keys: Vec::new(),
4674 uniqueness_constraints: Vec::new(),
4675 checks: Vec::new(),
4676 }
4677 }
4678}
4679
4680const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4728const FILE_VERSION: u8 = 24;
4772const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4775
4776const INDEX_KEY_TAG_INT: u8 = 0;
4781const INDEX_KEY_TAG_TEXT: u8 = 1;
4782const INDEX_KEY_TAG_BOOL: u8 = 2;
4783
4784impl Catalog {
4785 pub fn serialize(&self) -> Vec<u8> {
4788 let mut out = Vec::with_capacity(64);
4789 out.extend_from_slice(FILE_MAGIC);
4790 out.push(FILE_VERSION);
4791 write_u32(
4792 &mut out,
4793 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4794 );
4795 for t in &self.tables {
4796 write_str(&mut out, &t.schema.name);
4797 write_u16(
4798 &mut out,
4799 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4800 );
4801 for c in &t.schema.columns {
4802 write_str(&mut out, &c.name);
4803 write_data_type(&mut out, c.ty);
4804 out.push(u8::from(c.nullable));
4805 match &c.default {
4806 None => out.push(0),
4807 Some(v) => {
4808 out.push(1);
4809 write_value(&mut out, v);
4810 }
4811 }
4812 out.push(u8::from(c.auto_increment));
4813 }
4814 write_u32(
4815 &mut out,
4816 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4817 );
4818 for row in &t.rows {
4823 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4824 }
4825 write_u16(
4832 &mut out,
4833 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4834 );
4835 for idx in &t.indices {
4836 write_str(&mut out, &idx.name);
4837 write_u16(
4838 &mut out,
4839 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4840 );
4841 match &idx.kind {
4842 IndexKind::BTree(map) => {
4843 out.push(0);
4844 write_u32(
4852 &mut out,
4853 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4854 );
4855 for (key, locators) in map {
4856 write_index_key(&mut out, key);
4857 write_u32(
4858 &mut out,
4859 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4860 );
4861 for loc in locators {
4862 loc.write_le(&mut out);
4863 }
4864 }
4865 }
4866 IndexKind::Nsw(g) => {
4867 out.push(1);
4868 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4869 write_nsw_graph(&mut out, g);
4870 }
4871 IndexKind::Brin { column_type } => {
4872 out.push(2);
4878 write_data_type(&mut out, *column_type);
4879 }
4880 IndexKind::Gin(map) => {
4881 out.push(3);
4890 write_u32(
4891 &mut out,
4892 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4893 );
4894 for (word, locators) in map {
4895 write_str(&mut out, word);
4896 write_u32(
4897 &mut out,
4898 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4899 );
4900 for loc in locators {
4901 loc.write_le(&mut out);
4902 }
4903 }
4904 }
4905 IndexKind::GinTrgm(map) => {
4906 out.push(4);
4916 write_u32(
4917 &mut out,
4918 u32::try_from(map.len())
4919 .expect("≤ 4G trigram-GIN posting lists"),
4920 );
4921 for (tri, locators) in map {
4922 write_str(&mut out, tri);
4923 write_u32(
4924 &mut out,
4925 u32::try_from(locators.len())
4926 .expect("≤ 4G locators/posting list"),
4927 );
4928 for loc in locators {
4929 loc.write_le(&mut out);
4930 }
4931 }
4932 }
4933 }
4934 write_u16(
4940 &mut out,
4941 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4942 );
4943 for col_pos in &idx.included_columns {
4944 write_u16(
4945 &mut out,
4946 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4947 );
4948 }
4949 match &idx.partial_predicate {
4953 None => out.push(0),
4954 Some(pred) => {
4955 out.push(1);
4956 write_str(&mut out, pred);
4957 }
4958 }
4959 match &idx.expression {
4962 None => out.push(0),
4963 Some(expr) => {
4964 out.push(1);
4965 write_str(&mut out, expr);
4966 }
4967 }
4968 out.push(u8::from(idx.is_unique));
4972 write_u16(
4975 &mut out,
4976 u16::try_from(idx.extra_column_positions.len())
4977 .expect("≤ 65k extra cols / index"),
4978 );
4979 for cp in &idx.extra_column_positions {
4980 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4981 }
4982 }
4983 match t.schema.hot_tier_bytes {
4989 None => out.push(0),
4990 Some(n) => {
4991 out.push(1);
4992 out.extend_from_slice(&n.to_le_bytes());
4993 }
4994 }
4995 write_u16(
5006 &mut out,
5007 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
5008 );
5009 for fk in &t.schema.foreign_keys {
5010 match &fk.name {
5011 None => out.push(0),
5012 Some(n) => {
5013 out.push(1);
5014 write_str(&mut out, n);
5015 }
5016 }
5017 write_u16(
5018 &mut out,
5019 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
5020 );
5021 for &p in &fk.local_columns {
5022 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5023 }
5024 write_str(&mut out, &fk.parent_table);
5025 write_u16(
5026 &mut out,
5027 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
5028 );
5029 for &p in &fk.parent_columns {
5030 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5031 }
5032 out.push(fk.on_delete.tag());
5033 out.push(fk.on_update.tag());
5034 }
5035 write_u16(
5044 &mut out,
5045 u16::try_from(t.schema.uniqueness_constraints.len())
5046 .expect("≤ 65k uniqueness constraints/table"),
5047 );
5048 for uc in &t.schema.uniqueness_constraints {
5049 out.push(u8::from(uc.is_primary_key));
5050 write_u16(
5051 &mut out,
5052 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
5053 );
5054 for &p in &uc.columns {
5055 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
5056 }
5057 out.push(u8::from(uc.nulls_not_distinct));
5062 }
5063 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
5070 for (i, c) in t.schema.columns.iter().enumerate() {
5071 if let Some(e) = &c.runtime_default {
5072 rt_defaults.push((i, e.as_str()));
5073 }
5074 }
5075 write_u16(
5076 &mut out,
5077 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
5078 );
5079 for (pos, expr) in rt_defaults {
5080 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
5081 write_str(&mut out, expr);
5082 }
5083 write_u16(
5090 &mut out,
5091 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
5092 );
5093 for c in &t.schema.checks {
5094 write_str(&mut out, c.as_str());
5095 }
5096 }
5097 write_u32(
5110 &mut out,
5111 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
5112 );
5113 for fd in self.functions.values() {
5114 write_str(&mut out, &fd.name);
5115 write_str(&mut out, &fd.args_repr);
5116 write_str(&mut out, &fd.returns);
5117 write_str(&mut out, &fd.language);
5118 write_str_long(&mut out, &fd.body);
5119 }
5120 write_u32(
5121 &mut out,
5122 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
5123 );
5124 for td in &self.triggers {
5125 write_str(&mut out, &td.name);
5126 write_str(&mut out, &td.table);
5127 write_str(&mut out, &td.timing);
5128 write_u16(
5129 &mut out,
5130 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
5131 );
5132 for ev in &td.events {
5133 write_str(&mut out, ev);
5134 }
5135 write_str(&mut out, &td.for_each);
5136 write_str(&mut out, &td.function);
5137 write_u16(
5141 &mut out,
5142 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
5143 );
5144 for c in &td.update_columns {
5145 write_str(&mut out, c);
5146 }
5147 }
5148 out
5149 }
5150
5151 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
5154 let mut cur = Cursor::new(buf);
5155 let magic = cur.take(8)?;
5156 if magic != FILE_MAGIC {
5157 return Err(StorageError::Corrupt(format!(
5158 "bad magic: expected SPGDB001, got {magic:?}"
5159 )));
5160 }
5161 let version = cur.read_u8()?;
5162 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
5163 return Err(StorageError::Corrupt(format!(
5164 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
5165 )));
5166 }
5167 let table_count = cur.read_u32()? as usize;
5168 let mut cat = Self::new();
5169 for _ in 0..table_count {
5170 deserialize_table(&mut cur, &mut cat, version)?;
5171 }
5172 if version >= 22 {
5176 let fn_count = cur.read_u32()? as usize;
5177 for _ in 0..fn_count {
5178 let name = cur.read_str()?;
5179 let args_repr = cur.read_str()?;
5180 let returns = cur.read_str()?;
5181 let language = cur.read_str()?;
5182 let body = cur.read_str_long()?;
5183 cat.functions.insert(
5184 name.clone(),
5185 FunctionDef {
5186 name,
5187 args_repr,
5188 returns,
5189 language,
5190 body,
5191 },
5192 );
5193 }
5194 let trg_count = cur.read_u32()? as usize;
5195 for _ in 0..trg_count {
5196 let name = cur.read_str()?;
5197 let table = cur.read_str()?;
5198 let timing = cur.read_str()?;
5199 let ev_count = cur.read_u16()? as usize;
5200 let mut events = Vec::with_capacity(ev_count);
5201 for _ in 0..ev_count {
5202 events.push(cur.read_str()?);
5203 }
5204 let for_each = cur.read_str()?;
5205 let function = cur.read_str()?;
5206 let update_columns = if version >= 23 {
5210 let n = cur.read_u16()? as usize;
5211 let mut cols = Vec::with_capacity(n);
5212 for _ in 0..n {
5213 cols.push(cur.read_str()?);
5214 }
5215 cols
5216 } else {
5217 Vec::new()
5218 };
5219 cat.triggers.push(TriggerDef {
5220 name,
5221 table,
5222 timing,
5223 events,
5224 for_each,
5225 function,
5226 update_columns,
5227 });
5228 }
5229 }
5230 if cur.pos < buf.len() {
5231 return Err(StorageError::Corrupt(format!(
5232 "trailing bytes: {} unread",
5233 buf.len() - cur.pos
5234 )));
5235 }
5236 Ok(cat)
5237 }
5238}
5239
5240fn deserialize_table(
5245 cur: &mut Cursor<'_>,
5246 cat: &mut Catalog,
5247 version: u8,
5248) -> Result<(), StorageError> {
5249 let table_name = cur.read_str()?;
5250 let name = table_name.clone();
5251 let col_count = cur.read_u16()? as usize;
5252 let mut cols = Vec::with_capacity(col_count);
5253 for _ in 0..col_count {
5254 let c_name = cur.read_str()?;
5255 let ty = cur.read_data_type()?;
5256 let nullable = cur.read_u8()? != 0;
5257 let default = match cur.read_u8()? {
5258 0 => None,
5259 1 => Some(cur.read_value()?),
5260 other => {
5261 return Err(StorageError::Corrupt(format!(
5262 "unknown default tag: {other}"
5263 )));
5264 }
5265 };
5266 let auto_increment = cur.read_u8()? != 0;
5267 cols.push(ColumnSchema {
5271 name: c_name,
5272 ty,
5273 nullable,
5274 default,
5275 runtime_default: None,
5276 auto_increment,
5277 });
5278 }
5279 let n_cols = cols.len();
5280 cat.create_table(TableSchema::new(name, cols))?;
5281 let t = cat.tables.last_mut().expect("create_table just pushed");
5285 deserialize_rows(cur, t, n_cols)?;
5286 deserialize_indices(cur, t, version)?;
5287 if version >= 11 {
5293 let has = cur.read_u8()?;
5294 let hot_tier_bytes = match has {
5295 0 => None,
5296 1 => Some(cur.read_u64()?),
5297 other => {
5298 return Err(StorageError::Corrupt(format!(
5299 "hot_tier_bytes appendix: unknown has-value byte {other}"
5300 )));
5301 }
5302 };
5303 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
5304 }
5305 if version >= 13 {
5308 let fk_count = cur.read_u16()? as usize;
5309 let mut fks = Vec::with_capacity(fk_count);
5310 for _ in 0..fk_count {
5311 let name = match cur.read_u8()? {
5312 0 => None,
5313 1 => Some(cur.read_str()?),
5314 other => {
5315 return Err(StorageError::Corrupt(format!(
5316 "FK appendix: unknown has-name byte {other}"
5317 )));
5318 }
5319 };
5320 let local_arity = cur.read_u16()? as usize;
5321 let mut local_columns = Vec::with_capacity(local_arity);
5322 for _ in 0..local_arity {
5323 local_columns.push(cur.read_u16()? as usize);
5324 }
5325 let parent_table = cur.read_str()?;
5326 let parent_arity = cur.read_u16()? as usize;
5327 if parent_arity != local_arity {
5328 return Err(StorageError::Corrupt(format!(
5329 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
5330 )));
5331 }
5332 let mut parent_columns = Vec::with_capacity(parent_arity);
5333 for _ in 0..parent_arity {
5334 parent_columns.push(cur.read_u16()? as usize);
5335 }
5336 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5337 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
5338 })?;
5339 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5340 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
5341 })?;
5342 fks.push(ForeignKeyConstraint {
5343 name,
5344 local_columns,
5345 parent_table,
5346 parent_columns,
5347 on_delete,
5348 on_update,
5349 });
5350 }
5351 t.schema_mut().foreign_keys = fks;
5352 }
5353 if version >= 15 {
5356 let uc_count = cur.read_u16()? as usize;
5357 let mut ucs = Vec::with_capacity(uc_count);
5358 for _ in 0..uc_count {
5359 let is_pk = cur.read_u8()? != 0;
5360 let arity = cur.read_u16()? as usize;
5361 let mut cols = Vec::with_capacity(arity);
5362 for _ in 0..arity {
5363 cols.push(cur.read_u16()? as usize);
5364 }
5365 let nulls_not_distinct = if version >= 23 {
5369 cur.read_u8()? != 0
5370 } else {
5371 false
5372 };
5373 ucs.push(UniquenessConstraint {
5374 is_primary_key: is_pk,
5375 columns: cols,
5376 nulls_not_distinct,
5377 });
5378 }
5379 t.schema_mut().uniqueness_constraints = ucs;
5380 let rt_count = cur.read_u16()? as usize;
5382 for _ in 0..rt_count {
5383 let pos = cur.read_u16()? as usize;
5384 let expr = cur.read_str()?;
5385 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5386 col.runtime_default = Some(expr);
5387 }
5388 }
5389 }
5390 if version >= 23 {
5393 let check_count = cur.read_u16()? as usize;
5394 let mut checks = Vec::with_capacity(check_count);
5395 for _ in 0..check_count {
5396 checks.push(cur.read_str()?);
5397 }
5398 t.schema_mut().checks = checks;
5399 }
5400 let _ = table_name;
5401 Ok(())
5402}
5403
5404fn deserialize_rows(
5405 cur: &mut Cursor<'_>,
5406 t: &mut Table,
5407 _n_cols: usize,
5408) -> Result<(), StorageError> {
5409 let row_count = cur.read_u32()? as usize;
5410 let mut hot_bytes: u64 = 0;
5415 for _ in 0..row_count {
5416 let tail = &cur.buf[cur.pos..];
5417 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5418 cur.pos += consumed;
5419 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5425 t.rows.push_mut(row);
5426 }
5427 t.hot_bytes = hot_bytes;
5428 Ok(())
5429}
5430
5431fn deserialize_indices(
5432 cur: &mut Cursor<'_>,
5433 t: &mut Table,
5434 version: u8,
5435) -> Result<(), StorageError> {
5436 let index_count = cur.read_u16()? as usize;
5437 for _ in 0..index_count {
5438 let idx_name = cur.read_str()?;
5439 let col_pos = cur.read_u16()? as usize;
5440 let column_name = t
5441 .schema
5442 .columns
5443 .get(col_pos)
5444 .ok_or_else(|| {
5445 StorageError::Corrupt(format!(
5446 "index {idx_name:?} points at non-existent column position {col_pos}"
5447 ))
5448 })?
5449 .name
5450 .clone();
5451 let kind_tag = cur.read_u8()?;
5452 match kind_tag {
5453 0 => {
5454 if version >= 9 {
5455 let map = read_btree_map(cur)?;
5460 t.restore_btree_index(idx_name, &column_name, map)?;
5461 } else {
5462 t.add_index(idx_name, &column_name)?;
5467 }
5468 }
5469 1 => {
5470 let m = cur.read_u16()? as usize;
5471 let graph = cur.read_nsw_graph(m)?;
5472 t.restore_nsw_index(idx_name, &column_name, graph)?;
5473 }
5474 2 => {
5475 let column_type = cur.read_data_type()?;
5479 t.restore_brin_index(idx_name, &column_name, column_type)?;
5480 }
5481 3 => {
5482 let map = read_gin_map(cur)?;
5487 t.restore_gin_index(idx_name, &column_name, map)?;
5488 }
5489 4 => {
5490 if version < 24 {
5494 return Err(StorageError::Corrupt(format!(
5495 "trigram-GIN index tag 4 found in catalog FILE_VERSION {version}; \
5496 FILE_VERSION 24+ required (v7.15.0 introduced this tag)"
5497 )));
5498 }
5499 let map = read_gin_map(cur)?;
5500 t.restore_gin_trgm_index(idx_name, &column_name, map)?;
5501 }
5502 other => {
5503 return Err(StorageError::Corrupt(format!(
5504 "unknown index kind tag: {other}"
5505 )));
5506 }
5507 }
5508 if version >= 12 {
5511 let num_included = cur.read_u16()? as usize;
5512 if num_included > 0 {
5513 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5514 for _ in 0..num_included {
5515 let cp = cur.read_u16()? as usize;
5516 if cp >= t.schema.columns.len() {
5517 return Err(StorageError::Corrupt(format!(
5518 "INCLUDE column position {cp} out of range \
5519 ({} schema columns)",
5520 t.schema.columns.len()
5521 )));
5522 }
5523 included.push(cp);
5524 }
5525 if let Some(last) = t.indices.last_mut() {
5526 last.included_columns = included;
5527 }
5528 }
5529 match cur.read_u8()? {
5531 0 => {}
5532 1 => {
5533 let pred = cur.read_str()?;
5534 if let Some(last) = t.indices.last_mut() {
5535 last.partial_predicate = Some(pred);
5536 }
5537 }
5538 other => {
5539 return Err(StorageError::Corrupt(format!(
5540 "partial_predicate tag: unknown byte {other}"
5541 )));
5542 }
5543 }
5544 match cur.read_u8()? {
5546 0 => {}
5547 1 => {
5548 let expr = cur.read_str()?;
5549 if let Some(last) = t.indices.last_mut() {
5550 last.expression = Some(expr);
5551 }
5552 }
5553 other => {
5554 return Err(StorageError::Corrupt(format!(
5555 "expression tag: unknown byte {other}"
5556 )));
5557 }
5558 }
5559 if version >= 16 {
5562 match cur.read_u8()? {
5563 0 => {}
5564 1 => {
5565 if let Some(last) = t.indices.last_mut() {
5566 last.is_unique = true;
5567 }
5568 }
5569 other => {
5570 return Err(StorageError::Corrupt(format!(
5571 "is_unique tag: unknown byte {other}"
5572 )));
5573 }
5574 }
5575 let n = cur.read_u16()? as usize;
5577 if n > 0 {
5578 let mut extras: Vec<usize> = Vec::with_capacity(n);
5579 for _ in 0..n {
5580 let cp = cur.read_u16()? as usize;
5581 if cp >= t.schema.columns.len() {
5582 return Err(StorageError::Corrupt(format!(
5583 "extra column position {cp} out of range \
5584 ({} schema columns)",
5585 t.schema.columns.len()
5586 )));
5587 }
5588 extras.push(cp);
5589 }
5590 if let Some(last) = t.indices.last_mut() {
5591 last.extra_column_positions = extras;
5592 }
5593 }
5594 }
5595 }
5596 }
5597 Ok(())
5598}
5599
5600fn read_btree_map(
5604 cur: &mut Cursor<'_>,
5605) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5606 let entry_count = cur.read_u32()? as usize;
5607 let mut map = PersistentBTreeMap::new();
5608 for _ in 0..entry_count {
5609 let key = cur.read_index_key()?;
5610 let locator_count = cur.read_u32()? as usize;
5611 let mut locators = Vec::with_capacity(locator_count);
5612 for _ in 0..locator_count {
5613 let tail = &cur.buf[cur.pos..];
5614 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5615 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5616 })?;
5617 cur.pos += consumed;
5618 locators.push(loc);
5619 }
5620 map.insert_mut(key, locators);
5621 }
5622 Ok(map)
5623}
5624
5625fn read_gin_map(
5629 cur: &mut Cursor<'_>,
5630) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5631 let entry_count = cur.read_u32()? as usize;
5632 let mut map = PersistentBTreeMap::new();
5633 for _ in 0..entry_count {
5634 let word = cur.read_str()?;
5635 let locator_count = cur.read_u32()? as usize;
5636 let mut locators = Vec::with_capacity(locator_count);
5637 for _ in 0..locator_count {
5638 let tail = &cur.buf[cur.pos..];
5639 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5640 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5641 })?;
5642 cur.pos += consumed;
5643 locators.push(loc);
5644 }
5645 map.insert_mut(word, locators);
5646 }
5647 Ok(map)
5648}
5649
5650fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5666 let entry = g.entry.map_or(u32::MAX, |e| {
5667 u32::try_from(e).expect("NSW entry fits in u32")
5668 });
5669 write_u16(
5670 out,
5671 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5672 );
5673 out.extend_from_slice(&entry.to_le_bytes());
5674 out.push(g.entry_level);
5675 let node_count = g.levels.len();
5676 write_u32(
5677 out,
5678 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5679 );
5680 for &lvl in &g.levels {
5681 out.push(lvl);
5682 }
5683 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5684 out.push(layer_count);
5685 for layer in &g.layers {
5686 write_u32(
5687 out,
5688 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5689 );
5690 for neighbors in layer {
5691 write_u16(
5692 out,
5693 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5694 );
5695 for &peer in neighbors {
5699 write_u32(out, peer);
5700 }
5701 }
5702 }
5703}
5704
5705fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5706 match t {
5707 DataType::Int => out.push(1),
5708 DataType::BigInt => out.push(2),
5709 DataType::Float => out.push(3),
5710 DataType::Text => out.push(4),
5711 DataType::Bool => out.push(5),
5712 DataType::Vector { dim, encoding } => match encoding {
5713 VecEncoding::F32 => {
5717 out.push(6);
5718 out.extend_from_slice(&dim.to_le_bytes());
5719 }
5720 VecEncoding::F16 => {
5723 out.push(15);
5724 out.extend_from_slice(&dim.to_le_bytes());
5725 }
5726 VecEncoding::Sq8 => {
5732 out.push(14);
5733 out.extend_from_slice(&dim.to_le_bytes());
5734 }
5735 },
5736 DataType::SmallInt => out.push(7),
5737 DataType::Varchar(max) => {
5738 out.push(8);
5739 out.extend_from_slice(&max.to_le_bytes());
5740 }
5741 DataType::Char(size) => {
5742 out.push(9);
5743 out.extend_from_slice(&size.to_le_bytes());
5744 }
5745 DataType::Numeric { precision, scale } => {
5746 out.push(10);
5747 out.push(precision);
5748 out.push(scale);
5749 }
5750 DataType::Date => out.push(11),
5751 DataType::Timestamp => out.push(12),
5752 DataType::Timestamptz => out.push(17),
5756 DataType::Interval => {
5761 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5762 }
5763 DataType::Json => out.push(13),
5764 DataType::Jsonb => out.push(16),
5767 DataType::Bytes => out.push(18),
5769 DataType::TextArray => out.push(19),
5772 DataType::IntArray => out.push(20),
5775 DataType::BigIntArray => out.push(21),
5778 DataType::TsVector => out.push(22),
5781 DataType::TsQuery => out.push(23),
5784 }
5785}
5786
5787impl Cursor<'_> {
5788 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5789 let tag = self.read_u8()?;
5790 match tag {
5791 1 => Ok(DataType::Int),
5792 2 => Ok(DataType::BigInt),
5793 3 => Ok(DataType::Float),
5794 4 => Ok(DataType::Text),
5795 5 => Ok(DataType::Bool),
5796 6 => Ok(DataType::Vector {
5797 dim: self.read_u32()?,
5798 encoding: VecEncoding::F32,
5799 }),
5800 7 => Ok(DataType::SmallInt),
5801 8 => Ok(DataType::Varchar(self.read_u32()?)),
5802 9 => Ok(DataType::Char(self.read_u32()?)),
5803 10 => {
5804 let precision = self.read_u8()?;
5805 let scale = self.read_u8()?;
5806 Ok(DataType::Numeric { precision, scale })
5807 }
5808 11 => Ok(DataType::Date),
5809 12 => Ok(DataType::Timestamp),
5810 13 => Ok(DataType::Json),
5811 14 => Ok(DataType::Vector {
5812 dim: self.read_u32()?,
5813 encoding: VecEncoding::Sq8,
5814 }),
5815 15 => Ok(DataType::Vector {
5819 dim: self.read_u32()?,
5820 encoding: VecEncoding::F16,
5821 }),
5822 16 => Ok(DataType::Jsonb),
5826 17 => Ok(DataType::Timestamptz),
5830 18 => Ok(DataType::Bytes),
5832 19 => Ok(DataType::TextArray),
5834 20 => Ok(DataType::IntArray),
5836 21 => Ok(DataType::BigIntArray),
5837 22 => Ok(DataType::TsVector),
5840 23 => Ok(DataType::TsQuery),
5841 other => Err(StorageError::Corrupt(format!(
5842 "unknown data type tag: {other}"
5843 ))),
5844 }
5845 }
5846}
5847
5848pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5854 debug_assert_eq!(
5855 row.values.len(),
5856 schema.columns.len(),
5857 "row_body_encoded_len: row arity must match schema"
5858 );
5859 let bitmap_bytes = schema.columns.len().div_ceil(8);
5860 let mut n = bitmap_bytes;
5861 for (col_idx, v) in row.values.iter().enumerate() {
5862 if matches!(v, Value::Null) {
5863 continue;
5864 }
5865 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5866 }
5867 n
5868}
5869
5870fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5876 match v {
5877 Value::SmallInt(_) => 2,
5878 Value::Int(_) | Value::Date(_) => 4,
5880 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5882 Value::Bool(_) => 1,
5883 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5885 Value::Vector(vec) => 4 + 4 * vec.len(),
5887 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5894 Value::HalfVector(h) => 4 + h.bytes.len(),
5897 Value::Numeric { .. } => 16 + 1,
5899 Value::Bytes(b) => 2 + b.len(),
5905 Value::TextArray(items) => {
5908 let mut n = 2; for item in items {
5910 n += 1; if let Some(s) = item {
5912 n += 2 + s.len();
5913 }
5914 }
5915 n
5916 }
5917 Value::IntArray(items) => {
5920 2 + items
5921 .iter()
5922 .map(|x| if x.is_some() { 5 } else { 1 })
5923 .sum::<usize>()
5924 }
5925 Value::BigIntArray(items) => {
5926 2 + items
5927 .iter()
5928 .map(|x| if x.is_some() { 9 } else { 1 })
5929 .sum::<usize>()
5930 }
5931 Value::TsVector(lexs) => {
5935 let mut n = 2;
5936 for l in lexs {
5937 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5938 }
5939 n
5940 }
5941 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5944 Value::Null => 0,
5946 Value::Interval { .. } => {
5948 unreachable!("Value::Interval has no on-disk encoding")
5949 }
5950 }
5951}
5952
5953pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5964 debug_assert_eq!(
5965 row.values.len(),
5966 schema.columns.len(),
5967 "dense encode: row arity must match schema"
5968 );
5969 let bitmap_bytes = schema.columns.len().div_ceil(8);
5970 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5973 let bitmap_offset = out.len();
5974 out.resize(bitmap_offset + bitmap_bytes, 0);
5975 for (i, v) in row.values.iter().enumerate() {
5976 if matches!(v, Value::Null) {
5977 out[bitmap_offset + i / 8] |= 1 << (i % 8);
5978 }
5979 }
5980 for (col_idx, v) in row.values.iter().enumerate() {
5981 if matches!(v, Value::Null) {
5982 continue;
5983 }
5984 write_value_body(&mut out, v, schema.columns[col_idx].ty);
5985 }
5986 out
5987}
5988
5989pub fn decode_row_body_dense(
5995 bytes: &[u8],
5996 schema: &TableSchema,
5997) -> Result<(Row, usize), StorageError> {
5998 let mut cur = Cursor::new(bytes);
5999 let bitmap_bytes = schema.columns.len().div_ceil(8);
6000 let mut bitmap_buf = [0u8; 32];
6001 if bitmap_bytes > bitmap_buf.len() {
6002 return Err(StorageError::Corrupt(format!(
6003 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
6004 )));
6005 }
6006 let slice = cur.take(bitmap_bytes)?;
6007 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
6008 let mut values = Vec::with_capacity(schema.columns.len());
6009 for (col_idx, col) in schema.columns.iter().enumerate() {
6010 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
6011 values.push(Value::Null);
6012 } else {
6013 values.push(cur.read_value_body(col.ty)?);
6014 }
6015 }
6016 Ok((Row { values }, cur.pos))
6017}
6018
6019fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
6028 match (v, ty) {
6029 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
6030 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
6031 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
6032 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
6033 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
6034 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
6035 write_str(out, s);
6036 }
6037 (
6038 Value::Vector(v),
6039 DataType::Vector {
6040 encoding: VecEncoding::F32,
6041 ..
6042 },
6043 ) => {
6044 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6045 out.extend_from_slice(&dim.to_le_bytes());
6046 for x in v {
6047 out.extend_from_slice(&x.to_le_bytes());
6048 }
6049 }
6050 (
6056 Value::Sq8Vector(q),
6057 DataType::Vector {
6058 encoding: VecEncoding::Sq8,
6059 ..
6060 },
6061 ) => {
6062 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6063 out.extend_from_slice(&dim.to_le_bytes());
6064 out.extend_from_slice(&q.min.to_le_bytes());
6065 out.extend_from_slice(&q.max.to_le_bytes());
6066 out.extend_from_slice(&q.bytes);
6067 }
6068 (
6072 Value::HalfVector(h),
6073 DataType::Vector {
6074 encoding: VecEncoding::F16,
6075 ..
6076 },
6077 ) => {
6078 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6079 out.extend_from_slice(&dim.to_le_bytes());
6080 out.extend_from_slice(&h.bytes);
6081 }
6082 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
6083 out.extend_from_slice(&scaled.to_le_bytes());
6084 out.push(scale);
6085 }
6086 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
6087 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
6088 out.extend_from_slice(&t.to_le_bytes())
6089 }
6090 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
6094 (Value::Bytes(b), DataType::Bytes) => {
6097 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
6098 out.extend_from_slice(&len.to_le_bytes());
6099 out.extend_from_slice(b);
6100 }
6101 (Value::TextArray(items), DataType::TextArray) => {
6104 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6105 out.extend_from_slice(&count.to_le_bytes());
6106 for item in items {
6107 match item {
6108 None => out.push(1),
6109 Some(s) => {
6110 out.push(0);
6111 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6112 out.extend_from_slice(&len.to_le_bytes());
6113 out.extend_from_slice(s.as_bytes());
6114 }
6115 }
6116 }
6117 }
6118 (Value::IntArray(items), DataType::IntArray) => {
6121 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6122 out.extend_from_slice(&count.to_le_bytes());
6123 for item in items {
6124 match item {
6125 None => out.push(1),
6126 Some(n) => {
6127 out.push(0);
6128 out.extend_from_slice(&n.to_le_bytes());
6129 }
6130 }
6131 }
6132 }
6133 (Value::BigIntArray(items), DataType::BigIntArray) => {
6136 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6137 out.extend_from_slice(&count.to_le_bytes());
6138 for item in items {
6139 match item {
6140 None => out.push(1),
6141 Some(n) => {
6142 out.push(0);
6143 out.extend_from_slice(&n.to_le_bytes());
6144 }
6145 }
6146 }
6147 }
6148 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
6151 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
6153 (other, ty) => unreachable!(
6157 "schema-driven encode received mismatched value/type pair: \
6158 value tag={:?}, column type={:?}",
6159 other.data_type(),
6160 ty
6161 ),
6162 }
6163}
6164
6165fn write_value(out: &mut Vec<u8>, v: &Value) {
6166 match v {
6167 Value::Null => out.push(0),
6168 Value::SmallInt(n) => {
6169 out.push(7);
6170 out.extend_from_slice(&n.to_le_bytes());
6171 }
6172 Value::Int(n) => {
6173 out.push(1);
6174 out.extend_from_slice(&n.to_le_bytes());
6175 }
6176 Value::BigInt(n) => {
6177 out.push(2);
6178 out.extend_from_slice(&n.to_le_bytes());
6179 }
6180 Value::Float(x) => {
6181 out.push(3);
6182 out.extend_from_slice(&x.to_le_bytes());
6183 }
6184 Value::Text(s) | Value::Json(s) => {
6189 out.push(4);
6190 write_str(out, s);
6191 }
6192 Value::Bool(b) => {
6193 out.push(5);
6194 out.push(u8::from(*b));
6195 }
6196 Value::Vector(v) => {
6197 out.push(6);
6198 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
6199 out.extend_from_slice(&dim.to_le_bytes());
6200 for x in v {
6201 out.extend_from_slice(&x.to_le_bytes());
6202 }
6203 }
6204 Value::Sq8Vector(q) => {
6209 out.push(11);
6210 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
6211 out.extend_from_slice(&dim.to_le_bytes());
6212 out.extend_from_slice(&q.min.to_le_bytes());
6213 out.extend_from_slice(&q.max.to_le_bytes());
6214 out.extend_from_slice(&q.bytes);
6215 }
6216 Value::HalfVector(h) => {
6221 out.push(12);
6222 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
6223 out.extend_from_slice(&dim.to_le_bytes());
6224 out.extend_from_slice(&h.bytes);
6225 }
6226 Value::Numeric { scaled, scale } => {
6227 out.push(8);
6228 out.extend_from_slice(&scaled.to_le_bytes());
6229 out.push(*scale);
6230 }
6231 Value::Date(d) => {
6232 out.push(9);
6233 out.extend_from_slice(&d.to_le_bytes());
6234 }
6235 Value::Timestamp(t) => {
6236 out.push(10);
6237 out.extend_from_slice(&t.to_le_bytes());
6238 }
6239 Value::Interval { .. } => {
6243 unreachable!(
6244 "Value::Interval has no on-disk encoding; engine must reject it before write"
6245 )
6246 }
6247 Value::Bytes(b) => {
6252 out.push(14);
6253 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
6254 out.extend_from_slice(&len.to_le_bytes());
6255 out.extend_from_slice(b);
6256 }
6257 Value::TextArray(items) => {
6260 out.push(15);
6261 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6262 out.extend_from_slice(&count.to_le_bytes());
6263 for item in items {
6264 match item {
6265 None => out.push(1),
6266 Some(s) => {
6267 out.push(0);
6268 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6269 out.extend_from_slice(&len.to_le_bytes());
6270 out.extend_from_slice(s.as_bytes());
6271 }
6272 }
6273 }
6274 }
6275 Value::IntArray(items) => {
6278 out.push(16);
6279 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6280 out.extend_from_slice(&count.to_le_bytes());
6281 for item in items {
6282 match item {
6283 None => out.push(1),
6284 Some(n) => {
6285 out.push(0);
6286 out.extend_from_slice(&n.to_le_bytes());
6287 }
6288 }
6289 }
6290 }
6291 Value::BigIntArray(items) => {
6294 out.push(17);
6295 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6296 out.extend_from_slice(&count.to_le_bytes());
6297 for item in items {
6298 match item {
6299 None => out.push(1),
6300 Some(n) => {
6301 out.push(0);
6302 out.extend_from_slice(&n.to_le_bytes());
6303 }
6304 }
6305 }
6306 }
6307 Value::TsVector(lexs) => {
6310 out.push(18);
6311 write_tsvector_body(out, lexs);
6312 }
6313 Value::TsQuery(ast) => {
6316 out.push(19);
6317 write_tsquery_body(out, ast);
6318 }
6319 }
6320}
6321
6322fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
6325 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
6326 out.extend_from_slice(&count.to_le_bytes());
6327 for l in lexs {
6328 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
6329 out.extend_from_slice(&wlen.to_le_bytes());
6330 out.extend_from_slice(l.word.as_bytes());
6331 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
6332 out.extend_from_slice(&plen.to_le_bytes());
6333 for p in &l.positions {
6334 out.extend_from_slice(&p.to_le_bytes());
6335 }
6336 out.push(l.weight);
6337 }
6338}
6339
6340fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
6344 match ast {
6345 TsQueryAst::Term { word, weight_mask } => {
6346 out.push(0);
6347 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
6348 out.extend_from_slice(&len.to_le_bytes());
6349 out.extend_from_slice(word.as_bytes());
6350 out.push(*weight_mask);
6351 }
6352 TsQueryAst::And(a, b) => {
6353 out.push(1);
6354 write_tsquery_body(out, a);
6355 write_tsquery_body(out, b);
6356 }
6357 TsQueryAst::Or(a, b) => {
6358 out.push(2);
6359 write_tsquery_body(out, a);
6360 write_tsquery_body(out, b);
6361 }
6362 TsQueryAst::Not(x) => {
6363 out.push(3);
6364 write_tsquery_body(out, x);
6365 }
6366 TsQueryAst::Phrase {
6367 left,
6368 right,
6369 distance,
6370 } => {
6371 out.push(4);
6372 out.extend_from_slice(&distance.to_le_bytes());
6373 write_tsquery_body(out, left);
6374 write_tsquery_body(out, right);
6375 }
6376 }
6377}
6378
6379fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6381 match ast {
6382 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6383 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6384 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6385 }
6386 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6387 TsQueryAst::Phrase { left, right, .. } => {
6388 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6389 }
6390 }
6391}
6392
6393fn write_u16(out: &mut Vec<u8>, n: u16) {
6394 out.extend_from_slice(&n.to_le_bytes());
6395}
6396fn write_u32(out: &mut Vec<u8>, n: u32) {
6397 out.extend_from_slice(&n.to_le_bytes());
6398}
6399fn write_str(out: &mut Vec<u8>, s: &str) {
6400 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6401 write_u16(out, len);
6402 out.extend_from_slice(s.as_bytes());
6403}
6404
6405fn write_str_long(out: &mut Vec<u8>, s: &str) {
6410 let len = u32::try_from(s.len()).expect("function body fits in u32");
6411 write_u32(out, len);
6412 out.extend_from_slice(s.as_bytes());
6413}
6414
6415fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6419 match key {
6420 IndexKey::Int(n) => {
6421 out.push(INDEX_KEY_TAG_INT);
6422 out.extend_from_slice(&n.to_le_bytes());
6423 }
6424 IndexKey::Text(s) => {
6425 out.push(INDEX_KEY_TAG_TEXT);
6426 write_str(out, s);
6427 }
6428 IndexKey::Bool(b) => {
6429 out.push(INDEX_KEY_TAG_BOOL);
6430 out.push(u8::from(*b));
6431 }
6432 }
6433}
6434
6435struct Cursor<'a> {
6436 buf: &'a [u8],
6437 pos: usize,
6438}
6439
6440impl<'a> Cursor<'a> {
6441 const fn new(buf: &'a [u8]) -> Self {
6442 Self { buf, pos: 0 }
6443 }
6444
6445 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6446 let end = self
6447 .pos
6448 .checked_add(n)
6449 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6450 if end > self.buf.len() {
6451 return Err(StorageError::Corrupt(format!(
6452 "unexpected EOF at offset {} (wanted {n} more bytes)",
6453 self.pos
6454 )));
6455 }
6456 let s = &self.buf[self.pos..end];
6457 self.pos = end;
6458 Ok(s)
6459 }
6460
6461 fn read_u8(&mut self) -> Result<u8, StorageError> {
6462 Ok(self.take(1)?[0])
6463 }
6464 fn read_u16(&mut self) -> Result<u16, StorageError> {
6465 let s = self.take(2)?;
6466 Ok(u16::from_le_bytes([s[0], s[1]]))
6467 }
6468 fn read_u32(&mut self) -> Result<u32, StorageError> {
6469 let s = self.take(4)?;
6470 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6471 }
6472 fn read_i32(&mut self) -> Result<i32, StorageError> {
6473 let s = self.take(4)?;
6474 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6475 }
6476 fn read_u64(&mut self) -> Result<u64, StorageError> {
6479 let s = self.take(8)?;
6480 Ok(u64::from_le_bytes([
6481 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6482 ]))
6483 }
6484 fn read_i64(&mut self) -> Result<i64, StorageError> {
6485 let s = self.take(8)?;
6486 let arr: [u8; 8] = s.try_into().expect("checked");
6487 Ok(i64::from_le_bytes(arr))
6488 }
6489 fn read_f64(&mut self) -> Result<f64, StorageError> {
6490 let s = self.take(8)?;
6491 let arr: [u8; 8] = s.try_into().expect("checked");
6492 Ok(f64::from_le_bytes(arr))
6493 }
6494 fn read_f32(&mut self) -> Result<f32, StorageError> {
6495 let s = self.take(4)?;
6496 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6497 }
6498 fn read_str(&mut self) -> Result<String, StorageError> {
6499 let len = self.read_u16()? as usize;
6500 let bytes = self.take(len)?;
6501 core::str::from_utf8(bytes)
6502 .map(String::from)
6503 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6504 }
6505
6506 fn read_str_long(&mut self) -> Result<String, StorageError> {
6510 let len = self.read_u32()? as usize;
6511 let bytes = self.take(len)?;
6512 core::str::from_utf8(bytes)
6513 .map(String::from)
6514 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6515 }
6516
6517 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6521 let tag = self.read_u8()?;
6522 match tag {
6523 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6524 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6525 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6526 other => Err(StorageError::Corrupt(format!(
6527 "unknown index key tag: {other}"
6528 ))),
6529 }
6530 }
6531 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6537 match ty {
6538 DataType::SmallInt => {
6539 let s = self.take(2)?;
6540 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6541 }
6542 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6543 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6544 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6545 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6546 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6547 Ok(Value::Text(self.read_str()?))
6548 }
6549 DataType::Vector {
6550 encoding: VecEncoding::F32,
6551 ..
6552 } => {
6553 let dim = self.read_u32()? as usize;
6554 let mut v = Vec::with_capacity(dim);
6555 for _ in 0..dim {
6556 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6557 v.push(f32::from_le_bytes(bytes));
6558 }
6559 Ok(Value::Vector(v))
6560 }
6561 DataType::Vector {
6562 encoding: VecEncoding::Sq8,
6563 ..
6564 } => {
6565 let dim = self.read_u32()? as usize;
6566 let min = self.read_f32()?;
6567 let max = self.read_f32()?;
6568 let bytes = self.take(dim)?.to_vec();
6569 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6570 }
6571 DataType::Vector {
6572 encoding: VecEncoding::F16,
6573 ..
6574 } => {
6575 let dim = self.read_u32()? as usize;
6576 let bytes = self.take(dim * 2)?.to_vec();
6577 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6578 }
6579 DataType::Numeric { .. } => {
6580 let s = self.take(16)?;
6581 let arr: [u8; 16] = s.try_into().expect("checked");
6582 let scaled = i128::from_le_bytes(arr);
6583 let scale = self.read_u8()?;
6584 Ok(Value::Numeric { scaled, scale })
6585 }
6586 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6587 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6588 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6589 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6590 DataType::Interval => {
6591 Err(StorageError::Corrupt(
6596 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6597 ))
6598 }
6599 DataType::Json => Ok(Value::Json(self.read_str()?)),
6600 DataType::Bytes => {
6603 let len = self.read_u16()? as usize;
6604 let bytes = self.take(len)?.to_vec();
6605 Ok(Value::Bytes(bytes))
6606 }
6607 DataType::TextArray => {
6609 let count = self.read_u16()? as usize;
6610 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6611 for _ in 0..count {
6612 match self.read_u8()? {
6613 0 => items.push(Some(self.read_str()?)),
6614 1 => items.push(None),
6615 other => {
6616 return Err(StorageError::Corrupt(format!(
6617 "TEXT[] null flag: unknown byte {other}"
6618 )));
6619 }
6620 }
6621 }
6622 Ok(Value::TextArray(items))
6623 }
6624 DataType::IntArray => {
6626 let count = self.read_u16()? as usize;
6627 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6628 for _ in 0..count {
6629 match self.read_u8()? {
6630 0 => items.push(Some(self.read_i32()?)),
6631 1 => items.push(None),
6632 other => {
6633 return Err(StorageError::Corrupt(format!(
6634 "INT[] null flag: unknown byte {other}"
6635 )));
6636 }
6637 }
6638 }
6639 Ok(Value::IntArray(items))
6640 }
6641 DataType::BigIntArray => {
6643 let count = self.read_u16()? as usize;
6644 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6645 for _ in 0..count {
6646 match self.read_u8()? {
6647 0 => items.push(Some(self.read_i64()?)),
6648 1 => items.push(None),
6649 other => {
6650 return Err(StorageError::Corrupt(format!(
6651 "BIGINT[] null flag: unknown byte {other}"
6652 )));
6653 }
6654 }
6655 }
6656 Ok(Value::BigIntArray(items))
6657 }
6658 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6662 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6663 }
6664 }
6665
6666 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6668 let count = self.read_u16()? as usize;
6669 let mut out = Vec::with_capacity(count);
6670 for _ in 0..count {
6671 let word = self.read_str()?;
6672 let pos_count = self.read_u16()? as usize;
6673 let mut positions = Vec::with_capacity(pos_count);
6674 for _ in 0..pos_count {
6675 positions.push(self.read_u16()?);
6676 }
6677 let weight = self.read_u8()?;
6678 out.push(TsLexeme {
6679 word,
6680 positions,
6681 weight,
6682 });
6683 }
6684 Ok(out)
6685 }
6686
6687 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6689 let tag = self.read_u8()?;
6690 match tag {
6691 0 => {
6692 let word = self.read_str()?;
6693 let weight_mask = self.read_u8()?;
6694 Ok(TsQueryAst::Term { word, weight_mask })
6695 }
6696 1 => {
6697 let a = self.read_tsquery_body()?;
6698 let b = self.read_tsquery_body()?;
6699 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6700 }
6701 2 => {
6702 let a = self.read_tsquery_body()?;
6703 let b = self.read_tsquery_body()?;
6704 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6705 }
6706 3 => {
6707 let x = self.read_tsquery_body()?;
6708 Ok(TsQueryAst::Not(Box::new(x)))
6709 }
6710 4 => {
6711 let distance = self.read_u16()?;
6712 let left = self.read_tsquery_body()?;
6713 let right = self.read_tsquery_body()?;
6714 Ok(TsQueryAst::Phrase {
6715 left: Box::new(left),
6716 right: Box::new(right),
6717 distance,
6718 })
6719 }
6720 other => Err(StorageError::Corrupt(format!(
6721 "tsquery: unknown node tag {other}"
6722 ))),
6723 }
6724 }
6725
6726 fn read_value(&mut self) -> Result<Value, StorageError> {
6727 let tag = self.read_u8()?;
6728 match tag {
6729 0 => Ok(Value::Null),
6730 1 => Ok(Value::Int(self.read_i32()?)),
6731 2 => Ok(Value::BigInt(self.read_i64()?)),
6732 3 => Ok(Value::Float(self.read_f64()?)),
6733 4 => Ok(Value::Text(self.read_str()?)),
6734 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6735 6 => {
6736 let dim = self.read_u32()? as usize;
6737 let mut v = Vec::with_capacity(dim);
6738 for _ in 0..dim {
6739 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6740 v.push(f32::from_le_bytes(bytes));
6741 }
6742 Ok(Value::Vector(v))
6743 }
6744 7 => {
6745 let s = self.take(2)?;
6746 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6747 }
6748 8 => {
6749 let s = self.take(16)?;
6750 let arr: [u8; 16] = s.try_into().expect("checked");
6751 let scaled = i128::from_le_bytes(arr);
6752 let scale = self.read_u8()?;
6753 Ok(Value::Numeric { scaled, scale })
6754 }
6755 9 => Ok(Value::Date(self.read_i32()?)),
6756 10 => Ok(Value::Timestamp(self.read_i64()?)),
6757 11 => {
6762 let dim = self.read_u32()? as usize;
6763 let min = self.read_f32()?;
6764 let max = self.read_f32()?;
6765 let bytes = self.take(dim)?.to_vec();
6766 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6767 }
6768 12 => {
6771 let dim = self.read_u32()? as usize;
6772 let bytes = self.take(dim * 2)?.to_vec();
6773 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6774 }
6775 14 => {
6777 let len = self.read_u16()? as usize;
6778 let bytes = self.take(len)?.to_vec();
6779 Ok(Value::Bytes(bytes))
6780 }
6781 15 => {
6784 let count = self.read_u16()? as usize;
6785 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6786 for _ in 0..count {
6787 match self.read_u8()? {
6788 0 => items.push(Some(self.read_str()?)),
6789 1 => items.push(None),
6790 other => {
6791 return Err(StorageError::Corrupt(format!(
6792 "TEXT[] null flag in value tag: unknown byte {other}"
6793 )));
6794 }
6795 }
6796 }
6797 Ok(Value::TextArray(items))
6798 }
6799 16 => {
6801 let count = self.read_u16()? as usize;
6802 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6803 for _ in 0..count {
6804 match self.read_u8()? {
6805 0 => items.push(Some(self.read_i32()?)),
6806 1 => items.push(None),
6807 other => {
6808 return Err(StorageError::Corrupt(format!(
6809 "INT[] null flag in value tag: unknown byte {other}"
6810 )));
6811 }
6812 }
6813 }
6814 Ok(Value::IntArray(items))
6815 }
6816 17 => {
6817 let count = self.read_u16()? as usize;
6818 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6819 for _ in 0..count {
6820 match self.read_u8()? {
6821 0 => items.push(Some(self.read_i64()?)),
6822 1 => items.push(None),
6823 other => {
6824 return Err(StorageError::Corrupt(format!(
6825 "BIGINT[] null flag in value tag: unknown byte {other}"
6826 )));
6827 }
6828 }
6829 }
6830 Ok(Value::BigIntArray(items))
6831 }
6832 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6835 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6837 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6838 }
6839 }
6840
6841 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6845 let m_max_0 = self.read_u16()? as usize;
6846 let entry_raw = self.read_u32()?;
6847 let entry = if entry_raw == u32::MAX {
6848 None
6849 } else {
6850 Some(entry_raw as usize)
6851 };
6852 let entry_level = self.read_u8()?;
6853 let node_count = self.read_u32()? as usize;
6854 let mut levels: PersistentVec<u8> = PersistentVec::new();
6859 for _ in 0..node_count {
6860 levels.push_mut(self.read_u8()?);
6861 }
6862 let layer_count = self.read_u8()? as usize;
6863 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6864 for _ in 0..layer_count {
6865 let n = self.read_u32()? as usize;
6866 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6867 for _ in 0..n {
6868 let cnt = self.read_u16()? as usize;
6869 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6870 for _ in 0..cnt {
6871 row.push(self.read_u32()?);
6872 }
6873 per_layer.push_mut(row);
6874 }
6875 layers.push(per_layer);
6876 }
6877 Ok(NswGraph {
6878 m,
6879 m_max_0,
6880 entry,
6881 entry_level,
6882 levels,
6883 layers,
6884 })
6885 }
6886}
6887
6888#[cfg(test)]
6889mod tests {
6890 use super::*;
6891 use alloc::string::ToString;
6892 use alloc::vec;
6893
6894 #[cfg(target_arch = "aarch64")]
6895 #[test]
6896 fn neon_l2_matches_scalar() {
6897 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6902 for &d in &dims {
6903 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6904 let mut a = Vec::with_capacity(d);
6905 let mut b = Vec::with_capacity(d);
6906 for _ in 0..d {
6907 state = state
6908 .wrapping_mul(6_364_136_223_846_793_005)
6909 .wrapping_add(1);
6910 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6911 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6912 state = state
6913 .wrapping_mul(6_364_136_223_846_793_005)
6914 .wrapping_add(1);
6915 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6916 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6917 a.push(x);
6918 b.push(y);
6919 }
6920 let scalar = l2_distance_sq_scalar(&a, &b);
6921 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6922 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6923 assert!(
6924 (scalar - neon).abs() <= tol,
6925 "dim={d}: scalar={scalar} neon={neon} diff={}",
6926 (scalar - neon).abs()
6927 );
6928 }
6929 }
6930
6931 #[cfg(target_arch = "aarch64")]
6932 #[test]
6933 fn neon_inner_product_matches_scalar() {
6934 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6938 for &d in &dims {
6939 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6940 let mut a = Vec::with_capacity(d);
6941 let mut b = Vec::with_capacity(d);
6942 for _ in 0..d {
6943 state = state
6944 .wrapping_mul(6_364_136_223_846_793_005)
6945 .wrapping_add(1);
6946 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6947 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6948 state = state
6949 .wrapping_mul(6_364_136_223_846_793_005)
6950 .wrapping_add(1);
6951 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6952 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6953 a.push(x);
6954 b.push(y);
6955 }
6956 let scalar = inner_product_scalar(&a, &b);
6957 let neon = unsafe { inner_product_neon(&a, &b) };
6958 #[allow(clippy::cast_precision_loss)]
6959 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6960 assert!(
6961 (scalar - neon).abs() <= tol,
6962 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6963 (scalar - neon).abs()
6964 );
6965 }
6966 }
6967
6968 #[cfg(target_arch = "aarch64")]
6969 #[allow(clippy::similar_names)]
6970 #[test]
6971 fn neon_cosine_dot_norms_matches_scalar() {
6972 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6973 for &d in &dims {
6974 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
6975 let mut a = Vec::with_capacity(d);
6976 let mut b = Vec::with_capacity(d);
6977 for _ in 0..d {
6978 state = state
6979 .wrapping_mul(6_364_136_223_846_793_005)
6980 .wrapping_add(1);
6981 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6982 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6983 state = state
6984 .wrapping_mul(6_364_136_223_846_793_005)
6985 .wrapping_add(1);
6986 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6987 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6988 a.push(x);
6989 b.push(y);
6990 }
6991 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
6992 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
6993 #[allow(clippy::cast_precision_loss)]
6994 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6995 #[allow(clippy::cast_precision_loss)]
6996 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6997 assert!(
6998 (dot_s - dot_n).abs() <= tol_d,
6999 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
7000 );
7001 assert!(
7002 (na_s - na_n).abs() <= tol_n,
7003 "cosine na dim={d}: scalar={na_s} neon={na_n}"
7004 );
7005 assert!(
7006 (nb_s - nb_n).abs() <= tol_n,
7007 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
7008 );
7009 }
7010 }
7011
7012 fn make_users_schema() -> TableSchema {
7013 TableSchema::new(
7014 "users",
7015 vec![
7016 ColumnSchema::new("id", DataType::Int, false),
7017 ColumnSchema::new("name", DataType::Text, false),
7018 ColumnSchema::new("score", DataType::Float, true),
7019 ],
7020 )
7021 }
7022
7023 #[test]
7024 fn value_type_tag_matches_variant() {
7025 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
7026 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
7027 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
7028 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
7029 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
7030 assert_eq!(Value::Null.data_type(), None);
7031 assert!(Value::Null.is_null());
7032 assert!(!Value::Int(0).is_null());
7033 }
7034
7035 #[test]
7036 fn sq8_value_reports_sq8_data_type() {
7037 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
7042 let v = Value::Sq8Vector(q);
7043 assert_eq!(
7044 v.data_type(),
7045 Some(DataType::Vector {
7046 dim: 5,
7047 encoding: VecEncoding::Sq8,
7048 }),
7049 );
7050 }
7051
7052 #[test]
7053 fn datatype_display_matches_pg_keyword() {
7054 assert_eq!(DataType::Int.to_string(), "INT");
7055 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
7056 assert_eq!(DataType::Float.to_string(), "FLOAT");
7057 assert_eq!(DataType::Text.to_string(), "TEXT");
7058 assert_eq!(DataType::Bool.to_string(), "BOOL");
7059 }
7060
7061 #[test]
7062 fn row_len_and_emptiness() {
7063 let r = Row::new(vec![Value::Int(1), Value::Null]);
7064 assert_eq!(r.len(), 2);
7065 assert!(!r.is_empty());
7066 assert!(Row::new(Vec::new()).is_empty());
7067 }
7068
7069 #[test]
7070 fn table_schema_column_position() {
7071 let s = make_users_schema();
7072 assert_eq!(s.column_position("id"), Some(0));
7073 assert_eq!(s.column_position("score"), Some(2));
7074 assert_eq!(s.column_position("missing"), None);
7075 }
7076
7077 #[test]
7078 fn catalog_create_table_then_lookup() {
7079 let mut cat = Catalog::new();
7080 cat.create_table(make_users_schema()).unwrap();
7081 assert_eq!(cat.table_count(), 1);
7082 assert!(cat.get("users").is_some());
7083 assert!(cat.get("nope").is_none());
7084 }
7085
7086 #[test]
7087 fn catalog_duplicate_table_is_rejected() {
7088 let mut cat = Catalog::new();
7089 cat.create_table(make_users_schema()).unwrap();
7090 let err = cat.create_table(make_users_schema()).unwrap_err();
7091 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
7092 }
7093
7094 #[test]
7095 fn table_insert_happy_path_appends_row() {
7096 let mut cat = Catalog::new();
7097 cat.create_table(make_users_schema()).unwrap();
7098 let t = cat.get_mut("users").unwrap();
7099 t.insert(Row::new(vec![
7100 Value::Int(1),
7101 Value::Text("alice".into()),
7102 Value::Float(99.5),
7103 ]))
7104 .unwrap();
7105 assert_eq!(t.row_count(), 1);
7106 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
7107 }
7108
7109 #[test]
7110 fn table_insert_arity_mismatch() {
7111 let mut cat = Catalog::new();
7112 cat.create_table(make_users_schema()).unwrap();
7113 let t = cat.get_mut("users").unwrap();
7114 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
7115 assert!(matches!(
7116 err,
7117 StorageError::ArityMismatch {
7118 expected: 3,
7119 actual: 1
7120 }
7121 ));
7122 assert_eq!(t.row_count(), 0);
7123 }
7124
7125 #[test]
7126 fn table_insert_type_mismatch_reports_column() {
7127 let mut cat = Catalog::new();
7128 cat.create_table(make_users_schema()).unwrap();
7129 let t = cat.get_mut("users").unwrap();
7130 let err = t
7131 .insert(Row::new(vec![
7132 Value::Int(1),
7133 Value::Int(42), Value::Float(0.0),
7135 ]))
7136 .unwrap_err();
7137 match err {
7138 StorageError::TypeMismatch {
7139 ref column,
7140 expected,
7141 actual,
7142 position,
7143 } => {
7144 assert_eq!(column, "name");
7145 assert_eq!(expected, DataType::Text);
7146 assert_eq!(actual, DataType::Int);
7147 assert_eq!(position, 1);
7148 }
7149 other => panic!("unexpected: {other:?}"),
7150 }
7151 assert_eq!(t.row_count(), 0);
7152 }
7153
7154 #[test]
7155 fn table_insert_null_into_not_null_rejected() {
7156 let mut cat = Catalog::new();
7157 cat.create_table(make_users_schema()).unwrap();
7158 let t = cat.get_mut("users").unwrap();
7159 let err = t
7160 .insert(Row::new(vec![
7161 Value::Int(1),
7162 Value::Null, Value::Float(1.0),
7164 ]))
7165 .unwrap_err();
7166 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
7167 }
7168
7169 #[test]
7170 fn table_insert_null_into_nullable_ok() {
7171 let mut cat = Catalog::new();
7172 cat.create_table(make_users_schema()).unwrap();
7173 let t = cat.get_mut("users").unwrap();
7174 t.insert(Row::new(vec![
7175 Value::Int(1),
7176 Value::Text("bob".into()),
7177 Value::Null,
7178 ]))
7179 .unwrap();
7180 assert_eq!(t.row_count(), 1);
7181 }
7182
7183 #[test]
7184 fn catalog_get_mut_independent_per_table() {
7185 let mut cat = Catalog::new();
7186 cat.create_table(TableSchema::new(
7187 "a",
7188 vec![ColumnSchema::new("v", DataType::Int, false)],
7189 ))
7190 .unwrap();
7191 cat.create_table(TableSchema::new(
7192 "b",
7193 vec![ColumnSchema::new("v", DataType::Int, false)],
7194 ))
7195 .unwrap();
7196 cat.get_mut("a")
7197 .unwrap()
7198 .insert(Row::new(vec![Value::Int(1)]))
7199 .unwrap();
7200 assert_eq!(cat.get("a").unwrap().row_count(), 1);
7201 assert_eq!(cat.get("b").unwrap().row_count(), 0);
7202 }
7203
7204 fn assert_round_trip(cat: &Catalog) {
7207 let bytes = cat.serialize();
7208 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7209 assert_eq!(restored.table_count(), cat.table_count());
7212 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
7213 assert_eq!(a.schema, b.schema);
7214 assert_eq!(a.rows, b.rows);
7215 }
7216 }
7217
7218 #[test]
7219 fn serialize_empty_catalog_round_trips() {
7220 assert_round_trip(&Catalog::new());
7221 }
7222
7223 #[test]
7224 fn serialize_single_empty_table_round_trips() {
7225 let mut cat = Catalog::new();
7226 cat.create_table(make_users_schema()).unwrap();
7227 assert_round_trip(&cat);
7228 }
7229
7230 #[test]
7231 fn nsw_clone_is_o1() {
7232 let mut cat = Catalog::new();
7241 cat.create_table(TableSchema::new(
7242 "docs",
7243 alloc::vec![
7244 ColumnSchema::new("id", DataType::Int, false),
7245 ColumnSchema::new(
7246 "v",
7247 DataType::Vector {
7248 dim: 3,
7249 encoding: VecEncoding::F32
7250 },
7251 true
7252 ),
7253 ],
7254 ))
7255 .unwrap();
7256 let t = cat.get_mut("docs").unwrap();
7257 for i in 0..1500_i32 {
7258 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
7260 t.insert(Row::new(alloc::vec![
7261 Value::Int(i),
7262 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7263 ]))
7264 .unwrap();
7265 }
7266 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7267 .unwrap();
7268 let g = match &cat.get("docs").unwrap().indices()[0].kind {
7269 IndexKind::Nsw(g) => g,
7270 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7271 panic!("expected NSW")
7272 }
7273 };
7274 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
7277 assert!(
7278 g.layers.len() >= 2,
7279 "1500 nodes should populate at least two HNSW layers, got {}",
7280 g.layers.len()
7281 );
7282
7283 let cloned = g.clone();
7284
7285 assert!(
7286 g.levels.shares_storage_with(&cloned.levels),
7287 "levels PV not shared after clone — clone copied elements (O(N))"
7288 );
7289 assert_eq!(g.layers.len(), cloned.layers.len());
7290 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
7291 assert!(
7292 orig.shares_storage_with(cl),
7293 "layer {l} PV not shared after clone — clone copied elements (O(N))"
7294 );
7295 }
7296 }
7297
7298 #[test]
7299 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
7300 let mut cat = Catalog::new();
7307 cat.create_table(TableSchema::new(
7308 "vecs",
7309 alloc::vec![
7310 ColumnSchema::new("id", DataType::Int, false),
7311 ColumnSchema::new(
7312 "v",
7313 DataType::Vector {
7314 dim: 8,
7315 encoding: VecEncoding::Sq8,
7316 },
7317 false,
7318 ),
7319 ],
7320 ))
7321 .unwrap();
7322 let t = cat.get_mut("vecs").unwrap();
7323 for i in 0..32_i32 {
7324 #[allow(clippy::cast_precision_loss)]
7325 let base = (i as f32) * 0.03;
7326 let v: Vec<f32> = (0..8_i32)
7327 .map(|j| {
7328 #[allow(clippy::cast_precision_loss)]
7329 let off = (j as f32) * 0.01;
7330 base + off
7331 })
7332 .collect();
7333 t.insert(Row::new(alloc::vec![
7334 Value::Int(i),
7335 Value::Sq8Vector(quantize::quantize(&v)),
7336 ]))
7337 .unwrap();
7338 }
7339 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7340 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7343 let (before_cell, before_ty, before_hits) = {
7344 let t_ref = cat.get("vecs").unwrap();
7345 (
7346 t_ref.rows()[5].values[1].clone(),
7347 t_ref.schema().columns[1].ty,
7348 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7349 )
7350 };
7351
7352 let bytes = cat.serialize();
7353 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7354 let rt = restored.get("vecs").unwrap();
7355 assert_eq!(rt.schema().columns[1].ty, before_ty);
7356 assert_eq!(rt.rows()[5].values[1], before_cell);
7357 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7358 assert_eq!(before_hits, after_hits);
7359 }
7360
7361 #[test]
7362 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7363 use crate::halfvec;
7370 let mut cat = Catalog::new();
7371 cat.create_table(TableSchema::new(
7372 "vecs",
7373 alloc::vec![
7374 ColumnSchema::new("id", DataType::Int, false),
7375 ColumnSchema::new(
7376 "v",
7377 DataType::Vector {
7378 dim: 8,
7379 encoding: VecEncoding::F16,
7380 },
7381 false,
7382 ),
7383 ],
7384 ))
7385 .unwrap();
7386 let t = cat.get_mut("vecs").unwrap();
7387 for i in 0..32_i32 {
7388 #[allow(clippy::cast_precision_loss)]
7389 let base = (i as f32) * 0.03;
7390 let v: Vec<f32> = (0..8_i32)
7391 .map(|j| {
7392 #[allow(clippy::cast_precision_loss)]
7393 let off = (j as f32) * 0.01;
7394 base + off
7395 })
7396 .collect();
7397 t.insert(Row::new(alloc::vec![
7398 Value::Int(i),
7399 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7400 ]))
7401 .unwrap();
7402 }
7403 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7404 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7405 let (before_cell, before_ty, before_hits) = {
7406 let t_ref = cat.get("vecs").unwrap();
7407 (
7408 t_ref.rows()[5].values[1].clone(),
7409 t_ref.schema().columns[1].ty,
7410 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7411 )
7412 };
7413 let bytes = cat.serialize();
7414 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7415 let rt = restored.get("vecs").unwrap();
7416 assert_eq!(rt.schema().columns[1].ty, before_ty);
7417 assert_eq!(rt.rows()[5].values[1], before_cell);
7418 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7419 assert_eq!(before_hits, after_hits);
7420 }
7421
7422 #[test]
7423 #[allow(clippy::similar_names)]
7424 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7425 use crate::halfvec;
7432 fn next(state: &mut u64) -> f32 {
7433 *state = state
7434 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7435 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7436 #[allow(clippy::cast_precision_loss)]
7437 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7438 2.0 * u - 1.0
7439 }
7440 let dim: u32 = 32;
7441 let n: usize = 512;
7442 let dim_us = dim as usize;
7443 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7444 let corpus: Vec<Vec<f32>> = (0..n)
7445 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7446 .collect();
7447 let queries: Vec<Vec<f32>> = (0..32)
7448 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7449 .collect();
7450 let exact_top10: Vec<Vec<usize>> = queries
7451 .iter()
7452 .map(|q| {
7453 let mut scored: Vec<(f32, usize)> = corpus
7454 .iter()
7455 .enumerate()
7456 .map(|(i, v)| (l2_distance_sq(v, q), i))
7457 .collect();
7458 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7459 scored.into_iter().take(10).map(|(_, i)| i).collect()
7460 })
7461 .collect();
7462 let mut cat = Catalog::new();
7463 cat.create_table(TableSchema::new(
7464 "vecs",
7465 alloc::vec![
7466 ColumnSchema::new("id", DataType::Int, false),
7467 ColumnSchema::new(
7468 "v",
7469 DataType::Vector {
7470 dim,
7471 encoding: VecEncoding::F16,
7472 },
7473 false,
7474 ),
7475 ],
7476 ))
7477 .unwrap();
7478 let t = cat.get_mut("vecs").unwrap();
7479 for (i, v) in corpus.iter().enumerate() {
7480 t.insert(Row::new(alloc::vec![
7481 Value::Int(i32::try_from(i).unwrap()),
7482 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7483 ]))
7484 .unwrap();
7485 }
7486 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7487 let table = cat.get("vecs").unwrap();
7488 let mut total_overlap = 0_usize;
7489 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7490 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7491 for h in &hits {
7492 if exact.contains(h) {
7493 total_overlap += 1;
7494 }
7495 }
7496 }
7497 #[allow(clippy::cast_precision_loss)]
7498 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7499 assert!(
7500 recall >= 0.95,
7501 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7502 check halfvec dispatch in `cell_to_query_metric_distance`"
7503 );
7504 }
7505
7506 #[test]
7507 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7508 use crate::quantize;
7515 fn next(state: &mut u64) -> f32 {
7519 *state = state
7520 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7521 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7522 #[allow(clippy::cast_precision_loss)]
7523 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7524 2.0 * u - 1.0
7525 }
7526 let dim: u32 = 32;
7527 let n: usize = 512;
7528 let dim_us = dim as usize;
7529 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7530 let corpus: Vec<Vec<f32>> = (0..n)
7531 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7532 .collect();
7533 let queries: Vec<Vec<f32>> = (0..32)
7534 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7535 .collect();
7536 let exact_top10: Vec<Vec<usize>> = queries
7538 .iter()
7539 .map(|q| {
7540 let mut scored: Vec<(f32, usize)> = corpus
7541 .iter()
7542 .enumerate()
7543 .map(|(i, v)| (l2_distance_sq(v, q), i))
7544 .collect();
7545 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7546 scored.into_iter().take(10).map(|(_, i)| i).collect()
7547 })
7548 .collect();
7549 let mut cat = Catalog::new();
7552 cat.create_table(TableSchema::new(
7553 "vecs",
7554 alloc::vec![
7555 ColumnSchema::new("id", DataType::Int, false),
7556 ColumnSchema::new(
7557 "v",
7558 DataType::Vector {
7559 dim,
7560 encoding: VecEncoding::Sq8,
7561 },
7562 false,
7563 ),
7564 ],
7565 ))
7566 .unwrap();
7567 let t = cat.get_mut("vecs").unwrap();
7568 for (i, v) in corpus.iter().enumerate() {
7569 t.insert(Row::new(alloc::vec![
7570 Value::Int(i32::try_from(i).unwrap()),
7571 Value::Sq8Vector(quantize::quantize(v)),
7572 ]))
7573 .unwrap();
7574 }
7575 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7576 let table = cat.get("vecs").unwrap();
7577 let mut total_overlap = 0_usize;
7578 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7579 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7580 for h in &hits {
7581 if exact.contains(h) {
7582 total_overlap += 1;
7583 }
7584 }
7585 }
7586 #[allow(clippy::cast_precision_loss)]
7587 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7588 assert!(
7589 recall >= 0.95,
7590 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7591 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7592 );
7593 }
7594
7595 #[test]
7596 fn nsw_index_topology_persists_through_round_trip() {
7597 let mut cat = Catalog::new();
7603 cat.create_table(TableSchema::new(
7604 "docs",
7605 alloc::vec![
7606 ColumnSchema::new("id", DataType::Int, false),
7607 ColumnSchema::new(
7608 "v",
7609 DataType::Vector {
7610 dim: 3,
7611 encoding: VecEncoding::F32
7612 },
7613 true
7614 ),
7615 ],
7616 ))
7617 .unwrap();
7618 let t = cat.get_mut("docs").unwrap();
7619 for i in 0..6_i32 {
7620 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7622 let row = Row::new(alloc::vec![
7623 Value::Int(i),
7624 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7625 ]);
7626 t.insert(row).unwrap();
7627 }
7628 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7629 .unwrap();
7630 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7631 IndexKind::Nsw(g) => g.clone(),
7632 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7633 panic!("expected NSW")
7634 }
7635 };
7636 let bytes = cat.serialize();
7637 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7638 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7639 IndexKind::Nsw(g) => g.clone(),
7640 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) | IndexKind::GinTrgm(_) => {
7641 panic!("expected NSW")
7642 }
7643 };
7644 assert_eq!(restored_graph.m, original.m);
7645 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7646 assert_eq!(restored_graph.entry, original.entry);
7647 assert_eq!(restored_graph.entry_level, original.entry_level);
7648 assert_eq!(restored_graph.levels, original.levels);
7649 assert_eq!(restored_graph.layers, original.layers);
7650 }
7651
7652 #[test]
7653 fn hnsw_level_assignment_is_deterministic() {
7654 for i in 0..32usize {
7657 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7658 }
7659 }
7660
7661 #[test]
7662 fn hnsw_layer_0_dominates_population() {
7663 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7668 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7669 }
7670
7671 #[test]
7672 fn hnsw_search_matches_brute_force_for_l2_top1() {
7673 let mut cat = Catalog::new();
7677 cat.create_table(TableSchema::new(
7678 "vecs",
7679 alloc::vec![
7680 ColumnSchema::new("id", DataType::Int, false),
7681 ColumnSchema::new(
7682 "v",
7683 DataType::Vector {
7684 dim: 3,
7685 encoding: VecEncoding::F32
7686 },
7687 true
7688 ),
7689 ],
7690 ))
7691 .unwrap();
7692 let t = cat.get_mut("vecs").unwrap();
7693 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7694 (1, [0.0, 0.0, 0.0]),
7695 (2, [1.0, 0.0, 0.0]),
7696 (3, [0.0, 1.0, 0.0]),
7697 (4, [0.0, 0.0, 1.0]),
7698 (5, [1.0, 1.0, 0.0]),
7699 (6, [1.0, 0.0, 1.0]),
7700 (7, [0.0, 1.0, 1.0]),
7701 (8, [1.0, 1.0, 1.0]),
7702 (9, [0.5, 0.5, 0.5]),
7703 (10, [0.2, 0.8, 0.5]),
7704 ];
7705 for &(id, v) in &dataset {
7706 t.insert(Row::new(alloc::vec![
7707 Value::Int(id),
7708 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7709 ]))
7710 .unwrap();
7711 }
7712 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7713 let idx_pos = cat
7714 .get("vecs")
7715 .unwrap()
7716 .indices()
7717 .iter()
7718 .position(|i| i.name == "v_idx")
7719 .unwrap();
7720 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7721 let table = cat.get("vecs").unwrap();
7722 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7723 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7724 .map(|i| {
7725 let Value::Vector(v) = &table.rows[i].values[1] else {
7726 return (f32::INFINITY, i);
7727 };
7728 (l2_distance_sq(v, &query), i)
7729 })
7730 .collect();
7731 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7732 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7733 assert_eq!(
7734 hnsw_top[0].1, brute[0].1,
7735 "HNSW top-1 != brute-force top-1 for {query:?}"
7736 );
7737 }
7738 }
7739
7740 #[test]
7741 fn serialize_table_with_rows_round_trips() {
7742 let mut cat = Catalog::new();
7743 cat.create_table(make_users_schema()).unwrap();
7744 let t = cat.get_mut("users").unwrap();
7745 t.insert(Row::new(vec![
7746 Value::Int(1),
7747 Value::Text("alice".into()),
7748 Value::Float(95.5),
7749 ]))
7750 .unwrap();
7751 t.insert(Row::new(vec![
7752 Value::Int(2),
7753 Value::Text("bob".into()),
7754 Value::Null,
7755 ]))
7756 .unwrap();
7757 assert_round_trip(&cat);
7758 }
7759
7760 #[test]
7761 fn serialize_multiple_tables_round_trips() {
7762 let mut cat = Catalog::new();
7763 cat.create_table(make_users_schema()).unwrap();
7764 cat.create_table(TableSchema::new(
7765 "flags",
7766 vec![
7767 ColumnSchema::new("id", DataType::BigInt, false),
7768 ColumnSchema::new("active", DataType::Bool, false),
7769 ],
7770 ))
7771 .unwrap();
7772 cat.get_mut("flags")
7773 .unwrap()
7774 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7775 .unwrap();
7776 assert_round_trip(&cat);
7777 }
7778
7779 #[test]
7780 fn deserialize_rejects_bad_magic() {
7781 let mut buf = b"BADMAGIC".to_vec();
7782 buf.push(FILE_VERSION);
7783 buf.extend_from_slice(&0u32.to_le_bytes());
7784 let err = Catalog::deserialize(&buf).unwrap_err();
7785 assert!(matches!(err, StorageError::Corrupt(_)));
7786 }
7787
7788 #[test]
7789 fn deserialize_rejects_unsupported_version() {
7790 let mut buf = FILE_MAGIC.to_vec();
7791 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7793 let err = Catalog::deserialize(&buf).unwrap_err();
7794 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7795 }
7796
7797 #[test]
7798 fn deserialize_rejects_truncated_file() {
7799 let mut cat = Catalog::new();
7800 cat.create_table(make_users_schema()).unwrap();
7801 let bytes = cat.serialize();
7802 let truncated = &bytes[..bytes.len() - 1];
7804 assert!(matches!(
7805 Catalog::deserialize(truncated),
7806 Err(StorageError::Corrupt(_))
7807 ));
7808 }
7809
7810 #[test]
7811 fn deserialize_rejects_trailing_garbage() {
7812 let cat = Catalog::new();
7813 let mut bytes = cat.serialize();
7814 bytes.push(0xFF);
7815 assert!(matches!(
7816 Catalog::deserialize(&bytes),
7817 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7818 ));
7819 }
7820
7821 fn populated_users() -> Catalog {
7824 let mut cat = Catalog::new();
7825 cat.create_table(make_users_schema()).unwrap();
7826 let t = cat.get_mut("users").unwrap();
7827 for (id, name, score) in [
7828 (1, "alice", Some(90.0)),
7829 (2, "bob", None),
7830 (3, "alice", Some(70.0)), ] {
7832 t.insert(Row::new(vec![
7833 Value::Int(id),
7834 Value::Text(name.into()),
7835 score.map_or(Value::Null, Value::Float),
7836 ]))
7837 .unwrap();
7838 }
7839 cat
7840 }
7841
7842 #[test]
7843 fn add_index_builds_from_existing_rows() {
7844 let mut cat = populated_users();
7845 cat.get_mut("users")
7846 .unwrap()
7847 .add_index("by_id".into(), "id")
7848 .unwrap();
7849 let t = cat.get("users").unwrap();
7850 let idx = t.index_on(0).expect("index_on(0)");
7851 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7852 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7853 }
7854
7855 #[test]
7856 fn add_index_dup_name_rejected() {
7857 let mut cat = populated_users();
7858 let t = cat.get_mut("users").unwrap();
7859 t.add_index("ix".into(), "id").unwrap();
7860 let err = t.add_index("ix".into(), "name").unwrap_err();
7861 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7862 }
7863
7864 #[test]
7865 fn add_index_unknown_column_rejected() {
7866 let mut cat = populated_users();
7867 let err = cat
7868 .get_mut("users")
7869 .unwrap()
7870 .add_index("ix".into(), "ghost")
7871 .unwrap_err();
7872 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7873 }
7874
7875 #[test]
7876 fn insert_after_create_index_updates_it() {
7877 let mut cat = populated_users();
7878 let t = cat.get_mut("users").unwrap();
7879 t.add_index("by_name".into(), "name").unwrap();
7880 t.insert(Row::new(vec![
7881 Value::Int(4),
7882 Value::Text("dave".into()),
7883 Value::Null,
7884 ]))
7885 .unwrap();
7886 let idx = t.index_on(1).unwrap();
7887 assert_eq!(
7888 idx.lookup_eq(&IndexKey::Text("dave".into())),
7889 &[RowLocator::Hot(3)]
7890 );
7891 assert_eq!(
7893 idx.lookup_eq(&IndexKey::Text("alice".into())),
7894 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7895 );
7896 }
7897
7898 #[test]
7899 fn null_or_float_values_are_not_indexed() {
7900 let mut cat = populated_users();
7901 let t = cat.get_mut("users").unwrap();
7902 t.add_index("by_score".into(), "score").unwrap();
7903 let idx = t.index_on(2).unwrap();
7904 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7909 }
7910
7911 #[test]
7914 fn vector_value_data_type_carries_dim() {
7915 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7916 assert_eq!(
7917 v.data_type(),
7918 Some(DataType::Vector {
7919 dim: 3,
7920 encoding: VecEncoding::F32
7921 })
7922 );
7923 }
7924
7925 #[test]
7926 fn vector_column_insert_matching_dim_ok() {
7927 let mut cat = Catalog::new();
7928 cat.create_table(TableSchema::new(
7929 "emb",
7930 vec![ColumnSchema::new(
7931 "v",
7932 DataType::Vector {
7933 dim: 3,
7934 encoding: VecEncoding::F32,
7935 },
7936 false,
7937 )],
7938 ))
7939 .unwrap();
7940 cat.get_mut("emb")
7941 .unwrap()
7942 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7943 .unwrap();
7944 }
7945
7946 #[test]
7947 fn vector_column_insert_dim_mismatch_rejected() {
7948 let mut cat = Catalog::new();
7949 cat.create_table(TableSchema::new(
7950 "emb",
7951 vec![ColumnSchema::new(
7952 "v",
7953 DataType::Vector {
7954 dim: 3,
7955 encoding: VecEncoding::F32,
7956 },
7957 false,
7958 )],
7959 ))
7960 .unwrap();
7961 let err = cat
7962 .get_mut("emb")
7963 .unwrap()
7964 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7965 .unwrap_err();
7966 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7967 }
7968
7969 #[test]
7970 fn vector_value_survives_catalog_round_trip() {
7971 let mut cat = Catalog::new();
7972 cat.create_table(TableSchema::new(
7973 "emb",
7974 vec![
7975 ColumnSchema::new("id", DataType::Int, false),
7976 ColumnSchema::new(
7977 "v",
7978 DataType::Vector {
7979 dim: 4,
7980 encoding: VecEncoding::F32,
7981 },
7982 false,
7983 ),
7984 ],
7985 ))
7986 .unwrap();
7987 cat.get_mut("emb")
7988 .unwrap()
7989 .insert(Row::new(vec![
7990 Value::Int(1),
7991 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
7992 ]))
7993 .unwrap();
7994 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
7995 let table = restored.get("emb").unwrap();
7996 assert_eq!(
7997 table.schema().columns[1].ty,
7998 DataType::Vector {
7999 dim: 4,
8000 encoding: VecEncoding::F32
8001 }
8002 );
8003 assert_eq!(
8004 table.rows()[0].values[1],
8005 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
8006 );
8007 }
8008
8009 #[test]
8010 fn index_survives_serialize_deserialize_round_trip() {
8011 let mut cat = populated_users();
8012 cat.get_mut("users")
8013 .unwrap()
8014 .add_index("by_name".into(), "name")
8015 .unwrap();
8016 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8017 let idx = restored
8018 .get("users")
8019 .unwrap()
8020 .index_on(1)
8021 .expect("index_on(1) after restore");
8022 assert_eq!(idx.name, "by_name");
8023 assert_eq!(
8025 idx.lookup_eq(&IndexKey::Text("alice".into())),
8026 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8027 );
8028 }
8029
8030 fn bigint_pk_users_schema() -> TableSchema {
8035 TableSchema::new(
8036 "users",
8037 vec![
8038 ColumnSchema::new("id", DataType::BigInt, false),
8039 ColumnSchema::new("name", DataType::Text, false),
8040 ],
8041 )
8042 }
8043
8044 fn make_user_row(id: i64, name: &str) -> Row {
8045 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
8046 }
8047
8048 #[test]
8049 fn lookup_by_pk_finds_row_via_hot_index() {
8050 let mut cat = Catalog::new();
8051 cat.create_table(bigint_pk_users_schema()).unwrap();
8052 let t = cat.get_mut("users").unwrap();
8053 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8054 t.insert(make_user_row(id, name)).unwrap();
8055 }
8056 t.add_index("by_id".into(), "id").unwrap();
8057 let got = cat
8059 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8060 .unwrap();
8061 assert_eq!(got, make_user_row(2, "bob"));
8062 assert_eq!(cat.cold_segment_count(), 0);
8063 }
8064
8065 #[test]
8066 fn lookup_by_pk_returns_none_when_key_missing() {
8067 let mut cat = Catalog::new();
8068 cat.create_table(bigint_pk_users_schema()).unwrap();
8069 let t = cat.get_mut("users").unwrap();
8070 t.insert(make_user_row(1, "alice")).unwrap();
8071 t.add_index("by_id".into(), "id").unwrap();
8072 assert!(
8073 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8074 .is_none()
8075 );
8076 assert!(
8078 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
8079 .is_none()
8080 );
8081 assert!(
8082 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
8083 .is_none()
8084 );
8085 }
8086
8087 #[test]
8088 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
8089 let mut cat = Catalog::new();
8093 cat.create_table(bigint_pk_users_schema()).unwrap();
8094 let t = cat.get_mut("users").unwrap();
8095 t.add_index("by_id".into(), "id").unwrap();
8096 let schema = t.schema.clone();
8097
8098 let cold_rows: Vec<(i64, &str)> =
8099 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
8100 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8101 .iter()
8102 .map(|(id, name)| {
8103 let row = make_user_row(*id, name);
8104 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8105 })
8106 .collect();
8107 let (seg_bytes, _meta) =
8108 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8109 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8110 assert_eq!(seg_id, 0);
8111 assert_eq!(cat.cold_segment_count(), 1);
8112
8113 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8114 .iter()
8115 .map(|(id, _)| {
8116 (
8117 IndexKey::Int(*id),
8118 RowLocator::Cold {
8119 segment_id: seg_id,
8120 page_offset: 0,
8121 },
8122 )
8123 })
8124 .collect();
8125 let registered = cat
8126 .get_mut("users")
8127 .unwrap()
8128 .register_cold_locators("by_id", pairs)
8129 .unwrap();
8130 assert_eq!(registered, 4);
8131
8132 for (id, name) in &cold_rows {
8133 let got = cat
8134 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8135 .unwrap_or_else(|| panic!("cold key {id} not found"));
8136 assert_eq!(got, make_user_row(*id, name));
8137 }
8138 assert!(
8140 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
8141 .is_none()
8142 );
8143 }
8144
8145 #[test]
8146 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
8147 let mut cat = Catalog::new();
8151 cat.create_table(bigint_pk_users_schema()).unwrap();
8152 let t = cat.get_mut("users").unwrap();
8153 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8154 t.insert(make_user_row(id, name)).unwrap();
8155 }
8156 t.add_index("by_id".into(), "id").unwrap();
8157 let schema = t.schema.clone();
8158
8159 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
8160 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8161 .iter()
8162 .map(|(id, name)| {
8163 let row = make_user_row(*id, name);
8164 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8165 })
8166 .collect();
8167 let (seg_bytes, _) =
8168 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8169 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
8170 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8171 .iter()
8172 .map(|(id, _)| {
8173 (
8174 IndexKey::Int(*id),
8175 RowLocator::Cold {
8176 segment_id: seg_id,
8177 page_offset: 0,
8178 },
8179 )
8180 })
8181 .collect();
8182 cat.get_mut("users")
8183 .unwrap()
8184 .register_cold_locators("by_id", pairs)
8185 .unwrap();
8186
8187 assert_eq!(
8189 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8190 .unwrap(),
8191 make_user_row(1, "alice")
8192 );
8193 assert_eq!(
8194 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8195 .unwrap(),
8196 make_user_row(2, "bob")
8197 );
8198 assert_eq!(
8200 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
8201 .unwrap(),
8202 make_user_row(100, "ivy")
8203 );
8204 assert_eq!(
8205 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
8206 .unwrap(),
8207 make_user_row(200, "joe")
8208 );
8209 assert!(
8211 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
8212 .is_none()
8213 );
8214 }
8215
8216 #[test]
8217 fn register_cold_locators_rejects_nsw_index() {
8218 let mut cat = Catalog::new();
8219 cat.create_table(TableSchema::new(
8220 "vecs",
8221 vec![
8222 ColumnSchema::new("id", DataType::Int, false),
8223 ColumnSchema::new(
8224 "v",
8225 DataType::Vector {
8226 dim: 4,
8227 encoding: VecEncoding::F32,
8228 },
8229 false,
8230 ),
8231 ],
8232 ))
8233 .unwrap();
8234 let t = cat.get_mut("vecs").unwrap();
8235 t.insert(Row::new(vec![
8236 Value::Int(1),
8237 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
8238 ]))
8239 .unwrap();
8240 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
8241 let err = t
8242 .register_cold_locators(
8243 "by_v",
8244 vec![(
8245 IndexKey::Int(1),
8246 RowLocator::Cold {
8247 segment_id: 0,
8248 page_offset: 0,
8249 },
8250 )],
8251 )
8252 .unwrap_err();
8253 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
8256 }
8257
8258 #[test]
8259 fn load_segment_bytes_rejects_garbage() {
8260 let mut cat = Catalog::new();
8261 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
8262 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
8263 assert_eq!(cat.cold_segment_count(), 0);
8265 }
8266
8267 #[test]
8268 fn load_segment_bytes_returns_sequential_ids() {
8269 let mut cat = Catalog::new();
8270 cat.create_table(bigint_pk_users_schema()).unwrap();
8271 let schema = cat.get("users").unwrap().schema.clone();
8272 for batch in 0u32..3 {
8273 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
8274 .map(|i| {
8275 let id = u64::from(batch) * 100 + i;
8276 let row = make_user_row(id.cast_signed(), "x");
8277 (id, encode_row_body_dense(&row, &schema))
8278 })
8279 .collect();
8280 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8281 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
8282 }
8283 assert_eq!(cat.cold_segment_count(), 3);
8284 }
8285
8286 #[test]
8293 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
8294 let mut cat = populated_users();
8301 cat.get_mut("users")
8302 .unwrap()
8303 .add_index("by_name".into(), "name")
8304 .unwrap();
8305
8306 let v8_bytes = encode_as_v8(&cat);
8311 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
8312
8313 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
8314 let idx = restored
8315 .get("users")
8316 .unwrap()
8317 .index_on(1)
8318 .expect("index_on(1) after restore");
8319 assert_eq!(
8322 idx.lookup_eq(&IndexKey::Text("alice".into())),
8323 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8324 );
8325 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
8327 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
8328 }
8329 }
8330
8331 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
8336 let mut out = Vec::with_capacity(64);
8337 out.extend_from_slice(FILE_MAGIC);
8338 out.push(8u8);
8339 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
8340 for t in &cat.tables {
8341 write_str(&mut out, &t.schema.name);
8342 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
8343 for c in &t.schema.columns {
8344 write_str(&mut out, &c.name);
8345 write_data_type(&mut out, c.ty);
8346 out.push(u8::from(c.nullable));
8347 match &c.default {
8348 None => out.push(0),
8349 Some(v) => {
8350 out.push(1);
8351 write_value(&mut out, v);
8352 }
8353 }
8354 out.push(u8::from(c.auto_increment));
8355 }
8356 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
8357 for row in &t.rows {
8358 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8359 }
8360 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8361 for idx in &t.indices {
8362 write_str(&mut out, &idx.name);
8363 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8364 match &idx.kind {
8365 IndexKind::BTree(_) => out.push(0),
8368 IndexKind::Nsw(g) => {
8369 out.push(1);
8370 write_u16(&mut out, u16::try_from(g.m).unwrap());
8371 write_nsw_graph(&mut out, g);
8372 }
8373 IndexKind::Brin { .. } => panic!(
8376 "v8 catalog writer cannot serialise BRIN — \
8377 tests with BRIN indices must use the current writer"
8378 ),
8379 IndexKind::Gin(_) => panic!(
8380 "v8 catalog writer cannot serialise GIN — \
8381 tests with GIN indices must use the current writer"
8382 ),
8383 IndexKind::GinTrgm(_) => panic!(
8384 "v8 catalog writer cannot serialise trigram-GIN — \
8385 tests with trgm indices must use the current writer"
8386 ),
8387 }
8388 }
8389 }
8390 out
8391 }
8392
8393 #[test]
8399 fn v9_catalog_round_trip_preserves_cold_locators() {
8400 let mut cat = Catalog::new();
8401 cat.create_table(bigint_pk_users_schema()).unwrap();
8402 let t = cat.get_mut("users").unwrap();
8403 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8405 t.insert(make_user_row(id, name)).unwrap();
8406 }
8407 t.add_index("by_id".into(), "id").unwrap();
8408 let schema = t.schema.clone();
8409
8410 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8412 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8413 .iter()
8414 .map(|(id, name)| {
8415 let row = make_user_row(*id, name);
8416 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8417 })
8418 .collect();
8419 let (seg_bytes, _) =
8420 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8421 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8422 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8423 .iter()
8424 .map(|(id, _)| {
8425 (
8426 IndexKey::Int(*id),
8427 RowLocator::Cold {
8428 segment_id: seg_id,
8429 page_offset: 0,
8430 },
8431 )
8432 })
8433 .collect();
8434 cat.get_mut("users")
8435 .unwrap()
8436 .register_cold_locators("by_id", pairs)
8437 .unwrap();
8438
8439 let bytes = cat.serialize();
8441 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8442 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8443
8444 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8451 assert_eq!(restored_seg_id, seg_id);
8452
8453 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8454 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8456 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8457 for (id, _) in &cold_rows {
8459 assert_eq!(
8460 idx.lookup_eq(&IndexKey::Int(*id)),
8461 &[RowLocator::Cold {
8462 segment_id: seg_id,
8463 page_offset: 0,
8464 }]
8465 );
8466 }
8467 assert_eq!(
8469 restored
8470 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8471 .unwrap(),
8472 make_user_row(2, "bob")
8473 );
8474 for (id, name) in &cold_rows {
8475 assert_eq!(
8476 restored
8477 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8478 .unwrap(),
8479 make_user_row(*id, name)
8480 );
8481 }
8482 }
8483
8484 #[test]
8491 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8492 let schema = TableSchema::new(
8493 "wide",
8494 vec![
8495 ColumnSchema::new("a", DataType::SmallInt, true),
8496 ColumnSchema::new("b", DataType::Int, false),
8497 ColumnSchema::new("c", DataType::BigInt, false),
8498 ColumnSchema::new("d", DataType::Float, false),
8499 ColumnSchema::new("e", DataType::Bool, false),
8500 ColumnSchema::new("f", DataType::Text, false),
8501 ColumnSchema::new(
8502 "g",
8503 DataType::Vector {
8504 dim: 3,
8505 encoding: VecEncoding::F32,
8506 },
8507 false,
8508 ),
8509 ColumnSchema::new(
8510 "h",
8511 DataType::Numeric {
8512 precision: 18,
8513 scale: 2,
8514 },
8515 false,
8516 ),
8517 ColumnSchema::new("i", DataType::Date, false),
8518 ColumnSchema::new("j", DataType::Timestamp, false),
8519 ],
8520 );
8521 let cases: &[Row] = &[
8522 Row::new(vec![
8523 Value::SmallInt(7),
8524 Value::Int(42),
8525 Value::BigInt(1_000_000),
8526 Value::Float(1.5),
8527 Value::Bool(true),
8528 Value::Text("hello".into()),
8529 Value::Vector(vec![1.0, 2.0, 3.0]),
8530 Value::Numeric {
8531 scaled: 12345,
8532 scale: 2,
8533 },
8534 Value::Date(20_000),
8535 Value::Timestamp(1_700_000_000_000_000),
8536 ]),
8537 Row::new(vec![
8539 Value::Null,
8540 Value::Int(0),
8541 Value::BigInt(0),
8542 Value::Float(0.0),
8543 Value::Bool(false),
8544 Value::Text(String::new()),
8545 Value::Vector(vec![]),
8546 Value::Numeric {
8547 scaled: 0,
8548 scale: 2,
8549 },
8550 Value::Date(0),
8551 Value::Timestamp(0),
8552 ]),
8553 Row::new(vec![
8554 Value::SmallInt(-1),
8555 Value::Int(-1),
8556 Value::BigInt(-1),
8557 Value::Float(-0.5),
8558 Value::Bool(true),
8559 Value::Text("a much longer payload here".into()),
8560 Value::Vector(vec![0.1, 0.2, 0.3]),
8561 Value::Numeric {
8562 scaled: -999_999_999,
8563 scale: 2,
8564 },
8565 Value::Date(-1),
8566 Value::Timestamp(-1),
8567 ]),
8568 ];
8569 for row in cases {
8570 let actual = encode_row_body_dense(row, &schema).len();
8571 let fast = row_body_encoded_len(row, &schema);
8572 assert_eq!(actual, fast, "row {row:?}");
8573 }
8574 }
8575
8576 #[test]
8577 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8578 let mut cat = Catalog::new();
8579 cat.create_table(bigint_pk_users_schema()).unwrap();
8580 let t = cat.get_mut("users").unwrap();
8581 assert_eq!(t.hot_bytes(), 0);
8582 let mut expected: u64 = 0;
8583 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8584 let row = make_user_row(id, name);
8585 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8586 t.insert(row).unwrap();
8587 }
8588 assert_eq!(t.hot_bytes(), expected);
8589 assert_eq!(cat.hot_tier_bytes(), expected);
8590 }
8591
8592 #[test]
8593 fn hot_bytes_shrinks_on_delete() {
8594 let mut cat = Catalog::new();
8595 cat.create_table(bigint_pk_users_schema()).unwrap();
8596 let t = cat.get_mut("users").unwrap();
8597 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8598 t.insert(make_user_row(id, name)).unwrap();
8599 }
8600 let before = t.hot_bytes();
8601 let bob_row = make_user_row(2, "bob");
8603 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8604 let removed = t.delete_rows(&[1]);
8605 assert_eq!(removed, 1);
8606 assert_eq!(t.hot_bytes(), before - bob_bytes);
8607 }
8608
8609 #[test]
8610 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8611 let mut cat = Catalog::new();
8612 cat.create_table(bigint_pk_users_schema()).unwrap();
8613 let t = cat.get_mut("users").unwrap();
8614 t.insert(make_user_row(1, "alice")).unwrap();
8615 let after_insert = t.hot_bytes();
8616 let new_row = make_user_row(1, "alice-the-longer-name");
8619 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8620 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8621 t.update_row(0, new_row.values).unwrap();
8622 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8623 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8624 }
8625
8626 #[test]
8627 fn hot_bytes_round_trips_through_serialize_deserialize() {
8628 let mut cat = Catalog::new();
8629 cat.create_table(bigint_pk_users_schema()).unwrap();
8630 let t = cat.get_mut("users").unwrap();
8631 for i in 0..10 {
8632 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8633 .unwrap();
8634 }
8635 let pre = cat.hot_tier_bytes();
8636 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8637 assert_eq!(restored.hot_tier_bytes(), pre);
8638 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8639 }
8640
8641 #[test]
8648 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8649 let mut cat = Catalog::new();
8650 cat.create_table(bigint_pk_users_schema()).unwrap();
8651 let t = cat.get_mut("users").unwrap();
8652 for id in 0..10i64 {
8653 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8654 .unwrap();
8655 }
8656 t.add_index("by_id".into(), "id").unwrap();
8657 let total_bytes_before = t.hot_bytes();
8658
8659 let report = cat
8660 .freeze_oldest_to_cold("users", "by_id", 6)
8661 .expect("freeze succeeds");
8662 assert_eq!(report.frozen_rows, 6);
8663 assert_eq!(report.segment_id, 0);
8664 assert!(report.bytes_freed > 0);
8665 assert!(!report.segment_bytes.is_empty());
8666
8667 let t = cat.get("users").unwrap();
8668 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8669 assert_eq!(cat.cold_segment_count(), 1);
8670 assert_eq!(
8672 t.hot_bytes(),
8673 total_bytes_before - report.bytes_freed,
8674 "hot_bytes accounting matches FreezeReport"
8675 );
8676
8677 for id in 0..10i64 {
8680 let got = cat
8681 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8682 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8683 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8684 }
8685 }
8686
8687 #[test]
8692 fn freeze_twice_preserves_prior_cold_locators() {
8693 let mut cat = Catalog::new();
8694 cat.create_table(bigint_pk_users_schema()).unwrap();
8695 let t = cat.get_mut("users").unwrap();
8696 for id in 0..12i64 {
8697 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8698 .unwrap();
8699 }
8700 t.add_index("by_id".into(), "id").unwrap();
8701
8702 cat.freeze_oldest_to_cold("users", "by_id", 4)
8703 .expect("first freeze ok");
8704 cat.freeze_oldest_to_cold("users", "by_id", 4)
8705 .expect("second freeze ok");
8706
8707 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8708 assert_eq!(cat.cold_segment_count(), 2);
8709 for id in 0..12i64 {
8712 let got = cat
8713 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8714 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8715 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8716 }
8717 }
8718
8719 #[test]
8722 fn freeze_oldest_to_cold_rejects_invalid_input() {
8723 let mut cat = Catalog::new();
8724 cat.create_table(bigint_pk_users_schema()).unwrap();
8725 let t = cat.get_mut("users").unwrap();
8726 for id in 0..3i64 {
8727 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8728 .unwrap();
8729 }
8730 t.add_index("by_id".into(), "id").unwrap();
8731
8732 assert!(matches!(
8734 cat.freeze_oldest_to_cold("users", "by_id", 0),
8735 Err(StorageError::Corrupt(_))
8736 ));
8737 assert!(matches!(
8739 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8740 Err(StorageError::Corrupt(_))
8741 ));
8742 assert!(matches!(
8744 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8745 Err(StorageError::Corrupt(_))
8746 ));
8747 assert!(matches!(
8749 cat.freeze_oldest_to_cold("users", "by_id", 999),
8750 Err(StorageError::Corrupt(_))
8751 ));
8752 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8754 assert_eq!(cat.cold_segment_count(), 0);
8755 }
8756
8757 #[test]
8760 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8761 let mut cat = Catalog::new();
8762 cat.create_table(TableSchema::new(
8763 "by_name",
8764 vec![
8765 ColumnSchema::new("name", DataType::Text, false),
8766 ColumnSchema::new("payload", DataType::BigInt, false),
8767 ],
8768 ))
8769 .unwrap();
8770 let t = cat.get_mut("by_name").unwrap();
8771 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8772 .unwrap();
8773 t.add_index("by_n".into(), "name").unwrap();
8774 let err = cat
8775 .freeze_oldest_to_cold("by_name", "by_n", 1)
8776 .expect_err("non-integer PK rejected");
8777 match err {
8778 StorageError::Corrupt(s) => assert!(
8779 s.contains("non-integer"),
8780 "error message names the constraint: {s}"
8781 ),
8782 other => panic!("expected Corrupt, got {other:?}"),
8783 }
8784 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8786 assert_eq!(cat.cold_segment_count(), 0);
8787 }
8788
8789 #[test]
8794 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8795 let mut cat = Catalog::new();
8796 cat.create_table(bigint_pk_users_schema()).unwrap();
8797 let t = cat.get_mut("users").unwrap();
8798 for id in 0..6i64 {
8799 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8800 .unwrap();
8801 }
8802 t.add_index("by_id".into(), "id").unwrap();
8803 t.add_index("by_name".into(), "name").unwrap();
8804
8805 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8806
8807 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8811 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8812 assert_eq!(got.len(), 1);
8813 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8814 match got[0] {
8815 RowLocator::Hot(i) => {
8816 assert_eq!(i, 1);
8819 }
8820 RowLocator::Cold { .. } => unreachable!(),
8821 }
8822 }
8823
8824 #[test]
8832 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8833 let mut cat = Catalog::new();
8834 cat.create_table(bigint_pk_users_schema()).unwrap();
8835 let t = cat.get_mut("users").unwrap();
8836 for id in 0..6i64 {
8837 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8838 .unwrap();
8839 }
8840 t.add_index("by_id".into(), "id").unwrap();
8841 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8844 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8845
8846 let new_idx = cat
8848 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8849 .expect("promote ok")
8850 .expect("PK 2 was cold");
8851 assert_eq!(
8852 new_idx, 2,
8853 "promoted row appended after the 2 surviving hot rows"
8854 );
8855
8856 let t = cat.get("users").unwrap();
8857 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8858 let row = make_user_row(2, "u-2");
8860 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8861 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8862
8863 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8866 assert_eq!(entries.len(), 1, "exactly one locator per key");
8867 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8868 assert_eq!(
8870 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8871 .unwrap(),
8872 row
8873 );
8874 assert_eq!(
8877 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8878 .unwrap(),
8879 make_user_row(0, "u-0")
8880 );
8881 }
8882
8883 #[test]
8887 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8888 let mut cat = Catalog::new();
8889 cat.create_table(bigint_pk_users_schema()).unwrap();
8890 let t = cat.get_mut("users").unwrap();
8891 t.insert(make_user_row(7, "alice")).unwrap();
8892 t.add_index("by_id".into(), "id").unwrap();
8893
8894 assert!(
8896 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8897 .unwrap()
8898 .is_none()
8899 );
8900 assert!(
8902 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8903 .unwrap()
8904 .is_none()
8905 );
8906 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8908 assert_eq!(cat.cold_segment_count(), 0);
8909 }
8910
8911 #[test]
8916 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8917 let mut cat = Catalog::new();
8918 cat.create_table(bigint_pk_users_schema()).unwrap();
8919 let t = cat.get_mut("users").unwrap();
8920 for id in 0..5i64 {
8921 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8922 .unwrap();
8923 }
8924 t.add_index("by_id".into(), "id").unwrap();
8925 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8926
8927 assert!(
8929 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8930 .is_some(),
8931 "frozen PK resolves before shadow"
8932 );
8933 let removed = cat
8934 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8935 .unwrap();
8936 assert_eq!(removed, 1, "exactly one cold locator retired");
8937
8938 assert!(
8941 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8942 .is_none(),
8943 "shadowed key no longer resolves"
8944 );
8945 assert_eq!(
8947 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8948 .unwrap(),
8949 make_user_row(0, "u-0")
8950 );
8951 assert_eq!(
8952 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8953 .unwrap(),
8954 make_user_row(2, "u-2")
8955 );
8956 }
8957
8958 #[test]
8963 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8964 let mut cat = Catalog::new();
8965 cat.create_table(bigint_pk_users_schema()).unwrap();
8966 let t = cat.get_mut("users").unwrap();
8967 t.insert(make_user_row(1, "alice")).unwrap();
8968 t.add_index("by_id".into(), "id").unwrap();
8969 assert_eq!(
8970 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8971 .unwrap(),
8972 0,
8973 "hot-only key drops no cold locators"
8974 );
8975 assert_eq!(
8976 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
8977 .unwrap(),
8978 0,
8979 "absent key drops no cold locators"
8980 );
8981 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8982 }
8983
8984 #[test]
8986 fn promote_and_shadow_reject_invalid_inputs() {
8987 let mut cat = Catalog::new();
8988 cat.create_table(bigint_pk_users_schema()).unwrap();
8989 let t = cat.get_mut("users").unwrap();
8990 t.insert(make_user_row(1, "alice")).unwrap();
8991 t.add_index("by_id".into(), "id").unwrap();
8992
8993 assert!(matches!(
8995 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
8996 Err(StorageError::Corrupt(_))
8997 ));
8998 assert!(matches!(
8999 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
9000 Err(StorageError::Corrupt(_))
9001 ));
9002 assert!(matches!(
9004 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9005 Err(StorageError::Corrupt(_))
9006 ));
9007 assert!(matches!(
9008 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
9009 Err(StorageError::Corrupt(_))
9010 ));
9011 }
9012
9013 #[test]
9020 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
9021 let mut a = Catalog::new();
9022 let mut b = Catalog::new();
9023 for cat in [&mut a, &mut b] {
9024 cat.create_table(bigint_pk_users_schema()).unwrap();
9025 let t = cat.get_mut("users").unwrap();
9026 for id in 0..10i64 {
9027 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9028 .unwrap();
9029 }
9030 t.add_index("by_id".into(), "id").unwrap();
9031 }
9032 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
9033 let slice = b
9034 .prepare_freeze_slice("users", "by_id", 0..6)
9035 .expect("prepare");
9036 let parallel = b
9037 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
9038 .expect("commit");
9039 assert_eq!(single.segment_id, parallel.segment_id);
9040 assert_eq!(single.frozen_rows, parallel.frozen_rows);
9041 assert_eq!(single.bytes_freed, parallel.bytes_freed);
9042 assert_eq!(single.segment_bytes, parallel.segment_bytes);
9043 for id in 0..10i64 {
9045 assert_eq!(
9046 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9047 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9048 "PK {id} differs after single vs slice freeze"
9049 );
9050 }
9051 }
9052
9053 #[test]
9058 fn commit_freeze_slices_two_slices_match_single_slice() {
9059 let mut a = Catalog::new();
9060 let mut b = Catalog::new();
9061 for cat in [&mut a, &mut b] {
9062 cat.create_table(bigint_pk_users_schema()).unwrap();
9063 let t = cat.get_mut("users").unwrap();
9064 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
9067 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
9068 .unwrap();
9069 }
9070 t.add_index("by_id".into(), "id").unwrap();
9071 }
9072 let single = a
9073 .prepare_freeze_slice("users", "by_id", 0..8)
9074 .expect("prepare");
9075 let one = a
9076 .commit_freeze_slices("users", "by_id", alloc::vec![single])
9077 .expect("commit one");
9078 let s1 = b
9079 .prepare_freeze_slice("users", "by_id", 0..4)
9080 .expect("prepare s1");
9081 let s2 = b
9082 .prepare_freeze_slice("users", "by_id", 4..8)
9083 .expect("prepare s2");
9084 let two = b
9085 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
9086 .expect("commit two");
9087 assert_eq!(one.segment_bytes, two.segment_bytes);
9088 assert_eq!(one.frozen_rows, two.frozen_rows);
9089 for id in 0..10i64 {
9092 assert_eq!(
9093 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9094 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
9095 "PK {id} differs after one-slice vs two-slice freeze"
9096 );
9097 }
9098 }
9099
9100 #[test]
9102 fn commit_freeze_slices_rejects_gap() {
9103 let mut cat = Catalog::new();
9104 cat.create_table(bigint_pk_users_schema()).unwrap();
9105 let t = cat.get_mut("users").unwrap();
9106 for id in 0..6i64 {
9107 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9108 .unwrap();
9109 }
9110 t.add_index("by_id".into(), "id").unwrap();
9111 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
9112 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
9113 assert!(matches!(
9114 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
9115 Err(StorageError::Corrupt(_))
9116 ));
9117 assert_eq!(cat.cold_segment_count(), 0);
9119 assert_eq!(cat.get("users").unwrap().row_count(), 6);
9120 }
9121
9122 #[test]
9124 fn commit_freeze_slices_empty_is_noop() {
9125 let mut cat = Catalog::new();
9126 cat.create_table(bigint_pk_users_schema()).unwrap();
9127 let t = cat.get_mut("users").unwrap();
9128 for id in 0..3i64 {
9129 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9130 .unwrap();
9131 }
9132 t.add_index("by_id".into(), "id").unwrap();
9133 let report = cat
9134 .commit_freeze_slices("users", "by_id", Vec::new())
9135 .unwrap();
9136 assert_eq!(report.frozen_rows, 0);
9137 assert_eq!(cat.cold_segment_count(), 0);
9138 assert_eq!(cat.get("users").unwrap().row_count(), 3);
9139 }
9140
9141 #[test]
9148 fn compact_merges_small_segments_storage_unit() {
9149 let mut cat = Catalog::new();
9150 cat.create_table(bigint_pk_users_schema()).unwrap();
9151 let t = cat.get_mut("users").unwrap();
9152 for id in 0..8i64 {
9153 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9154 .unwrap();
9155 }
9156 t.add_index("by_id".into(), "id").unwrap();
9157 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9159 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9160 assert_eq!(cat.cold_segment_count(), 2);
9161 assert_eq!(cat.cold_segment_slot_count(), 2);
9162
9163 let max_seg_bytes = cat
9166 .cold_segment_ids_global()
9167 .iter()
9168 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9169 .max()
9170 .unwrap();
9171 let target = max_seg_bytes + 1;
9172
9173 let report = cat
9174 .compact_cold_segments("users", "by_id", target)
9175 .expect("compact succeeds");
9176 assert_eq!(report.sources.len(), 2);
9177 let merged_id = report.merged_segment_id.expect("merge happened");
9178 assert_eq!(report.merged_rows, 6);
9179 assert_eq!(report.deleted_rows_pruned, 0);
9180 assert!(!report.merged_segment_bytes.is_empty());
9181
9182 assert_eq!(cat.cold_segment_count(), 1);
9185 assert_eq!(cat.cold_segment_slot_count(), 3);
9186 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
9187
9188 for id in 0..8i64 {
9191 let got = cat
9192 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9193 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
9194 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9195 }
9196 }
9197
9198 #[test]
9202 fn compact_drops_shadowed_cold_rows() {
9203 let mut cat = Catalog::new();
9204 cat.create_table(bigint_pk_users_schema()).unwrap();
9205 let t = cat.get_mut("users").unwrap();
9206 for id in 0..6i64 {
9207 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9208 .unwrap();
9209 }
9210 t.add_index("by_id".into(), "id").unwrap();
9211 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9212 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9213 assert_eq!(
9215 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
9216 .unwrap(),
9217 1
9218 );
9219 assert_eq!(
9220 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
9221 .unwrap(),
9222 1
9223 );
9224
9225 let max_seg_bytes = cat
9226 .cold_segment_ids_global()
9227 .iter()
9228 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9229 .max()
9230 .unwrap();
9231 let report = cat
9232 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9233 .expect("compact succeeds");
9234 assert_eq!(report.sources.len(), 2);
9235 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
9236 assert_eq!(report.deleted_rows_pruned, 2);
9237
9238 for shadowed in [1i64, 4i64] {
9240 assert!(
9241 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
9242 .is_none(),
9243 "shadowed PK {shadowed} must remain invisible after compact"
9244 );
9245 }
9246 for live in [0i64, 2, 3, 5] {
9248 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
9249 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
9250 }
9251 }
9252
9253 #[test]
9256 fn compact_is_noop_below_two_candidates() {
9257 let mut cat = Catalog::new();
9258 cat.create_table(bigint_pk_users_schema()).unwrap();
9259 let t = cat.get_mut("users").unwrap();
9260 for id in 0..6i64 {
9261 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9262 .unwrap();
9263 }
9264 t.add_index("by_id".into(), "id").unwrap();
9265 let report = cat
9267 .compact_cold_segments("users", "by_id", 1 << 30)
9268 .expect("noop ok");
9269 assert!(report.merged_segment_id.is_none());
9270 assert!(report.sources.is_empty());
9271
9272 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
9274 let report = cat
9275 .compact_cold_segments("users", "by_id", 1 << 30)
9276 .expect("noop ok");
9277 assert!(report.merged_segment_id.is_none());
9278 assert_eq!(cat.cold_segment_count(), 1);
9279
9280 let report = cat
9283 .compact_cold_segments("users", "by_id", 1)
9284 .expect("noop ok");
9285 assert!(report.merged_segment_id.is_none());
9286 assert_eq!(cat.cold_segment_count(), 1);
9287 }
9288
9289 #[test]
9297 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
9298 let mut cat = Catalog::new();
9299 cat.create_table(bigint_pk_users_schema()).unwrap();
9300 let t = cat.get_mut("users").unwrap();
9301 for id in 0..6i64 {
9302 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9303 .unwrap();
9304 }
9305 t.add_index("by_id".into(), "id").unwrap();
9306 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9307 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9308 let max_seg_bytes = cat
9309 .cold_segment_ids_global()
9310 .iter()
9311 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9312 .max()
9313 .unwrap();
9314 let report = cat
9315 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9316 .expect("compact ok");
9317 let merged_id = report.merged_segment_id.unwrap();
9318
9319 let cat_bytes = cat.serialize();
9324 let merged_bytes = report.merged_segment_bytes.clone();
9325
9326 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
9327 restored
9328 .load_segment_bytes_at(merged_id, merged_bytes)
9329 .expect("reload merged ok");
9330
9331 for id in 0..6i64 {
9333 let got = restored
9334 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9335 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
9336 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9337 }
9338 assert_eq!(restored.cold_segment_count(), 1);
9341 }
9342
9343 #[test]
9346 fn load_segment_bytes_at_pads_and_rejects_collision() {
9347 let mut cat = Catalog::new();
9348 cat.create_table(bigint_pk_users_schema()).unwrap();
9349 let t = cat.get_mut("users").unwrap();
9350 for id in 0..4i64 {
9351 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9352 .unwrap();
9353 }
9354 t.add_index("by_id".into(), "id").unwrap();
9355 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9356 let bytes_seg0 = report.segment_bytes.clone();
9357
9358 cat.load_segment_bytes_at(5, bytes_seg0.clone())
9362 .expect("pad + load ok");
9363 assert_eq!(cat.cold_segment_slot_count(), 6);
9364 assert_eq!(cat.cold_segment_count(), 2);
9365
9366 assert!(matches!(
9368 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9369 Err(StorageError::Corrupt(_))
9370 ));
9371 assert!(matches!(
9373 cat.load_segment_bytes_at(0, bytes_seg0),
9374 Err(StorageError::Corrupt(_))
9375 ));
9376 }
9377
9378 #[test]
9382 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9383 let mut cat = Catalog::new();
9384 cat.create_table(bigint_pk_users_schema()).unwrap();
9385 let t = cat.get_mut("users").unwrap();
9386 for id in 0..4i64 {
9387 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9388 .unwrap();
9389 }
9390 t.add_index("by_id".into(), "id").unwrap();
9391
9392 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9394 let promoted = cat
9395 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9396 .unwrap();
9397 assert!(promoted.is_some());
9398 let entries_after_promote = cat
9399 .get("users")
9400 .unwrap()
9401 .index_on(0)
9402 .unwrap()
9403 .lookup_eq(&IndexKey::Int(0))
9404 .to_vec();
9405 assert_eq!(entries_after_promote.len(), 1);
9406 assert!(entries_after_promote[0].is_hot());
9407
9408 for id in [2i64, 3] {
9415 assert_eq!(
9416 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9417 .unwrap(),
9418 make_user_row(id, &alloc::format!("u-{id}"))
9419 );
9420 }
9421 }
9422}