1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::boxed::Box;
32use alloc::collections::{BTreeMap, BTreeSet};
33use alloc::format;
34use alloc::string::String;
35use alloc::sync::Arc;
36use alloc::vec::Vec;
37use core::fmt;
38
39use self::persistent::PersistentVec;
40use self::persistent_btree::PersistentBTreeMap;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
54pub enum VecEncoding {
55 #[default]
56 F32,
57 Sq8,
58 F16,
59}
60
61impl fmt::Display for VecEncoding {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Self::F32 => f.write_str("F32"),
65 Self::Sq8 => f.write_str("SQ8"),
66 Self::F16 => f.write_str("HALF"),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DataType {
76 SmallInt,
79 Int, BigInt, Float, Text,
83 Varchar(u32),
86 Char(u32),
90 Bool,
91 Vector {
97 dim: u32,
98 encoding: VecEncoding,
99 },
100 Numeric {
106 precision: u8,
107 scale: u8,
108 },
109 Date,
112 Timestamp,
115 Timestamptz,
123 Interval,
128 Json,
133 Jsonb,
139 Bytes,
147 TextArray,
156 IntArray,
160 BigIntArray,
163 TsVector,
171 TsQuery,
175}
176
177impl fmt::Display for DataType {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::SmallInt => f.write_str("SMALLINT"),
181 Self::Int => f.write_str("INT"),
182 Self::BigInt => f.write_str("BIGINT"),
183 Self::Float => f.write_str("FLOAT"),
184 Self::Text => f.write_str("TEXT"),
185 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
186 Self::Char(n) => write!(f, "CHAR({n})"),
187 Self::Bool => f.write_str("BOOL"),
188 Self::Vector { dim, encoding } => match encoding {
189 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
190 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
191 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
192 },
193 Self::Numeric { precision, scale } => {
194 if *scale == 0 {
195 write!(f, "NUMERIC({precision})")
196 } else {
197 write!(f, "NUMERIC({precision}, {scale})")
198 }
199 }
200 Self::Date => f.write_str("DATE"),
201 Self::Timestamp => f.write_str("TIMESTAMP"),
202 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
203 Self::Interval => f.write_str("INTERVAL"),
204 Self::Json => f.write_str("JSON"),
205 Self::Jsonb => f.write_str("JSONB"),
206 Self::Bytes => f.write_str("BYTEA"),
207 Self::TextArray => f.write_str("TEXT[]"),
208 Self::IntArray => f.write_str("INT[]"),
209 Self::BigIntArray => f.write_str("BIGINT[]"),
210 Self::TsVector => f.write_str("TSVECTOR"),
211 Self::TsQuery => f.write_str("TSQUERY"),
212 }
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct TsLexeme {
223 pub word: String,
224 pub positions: Vec<u16>,
225 pub weight: u8,
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TsQueryAst {
233 Term {
237 word: String,
238 weight_mask: u8,
239 },
240 And(Box<TsQueryAst>, Box<TsQueryAst>),
241 Or(Box<TsQueryAst>, Box<TsQueryAst>),
242 Not(Box<TsQueryAst>),
243 Phrase {
246 left: Box<TsQueryAst>,
247 right: Box<TsQueryAst>,
248 distance: u16,
249 },
250}
251
252#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub enum Value {
258 SmallInt(i16),
259 Int(i32),
260 BigInt(i64),
261 Float(f64),
262 Text(String),
263 Bool(bool),
264 Vector(Vec<f32>),
265 Sq8Vector(crate::quantize::Sq8Vector),
272 HalfVector(crate::halfvec::HalfVector),
278 Numeric {
282 scaled: i128,
283 scale: u8,
284 },
285 Date(i32),
287 Timestamp(i64),
289 Interval {
292 months: i32,
293 micros: i64,
294 },
295 Json(String),
299 Bytes(Vec<u8>),
305 TextArray(Vec<Option<String>>),
311 IntArray(Vec<Option<i32>>),
315 BigIntArray(Vec<Option<i64>>),
318 TsVector(Vec<TsLexeme>),
323 TsQuery(TsQueryAst),
326 Null,
327}
328
329impl Value {
330 pub fn data_type(&self) -> Option<DataType> {
332 match self {
333 Self::SmallInt(_) => Some(DataType::SmallInt),
334 Self::Int(_) => Some(DataType::Int),
335 Self::BigInt(_) => Some(DataType::BigInt),
336 Self::Float(_) => Some(DataType::Float),
337 Self::Text(_) => Some(DataType::Text),
340 Self::Bool(_) => Some(DataType::Bool),
341 Self::Vector(v) => Some(DataType::Vector {
342 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
343 encoding: VecEncoding::F32,
344 }),
345 Self::Sq8Vector(q) => Some(DataType::Vector {
346 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
347 encoding: VecEncoding::Sq8,
348 }),
349 Self::HalfVector(h) => Some(DataType::Vector {
350 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
351 encoding: VecEncoding::F16,
352 }),
353 Self::Numeric { scale, .. } => Some(DataType::Numeric {
358 precision: 0,
359 scale: *scale,
360 }),
361 Self::Date(_) => Some(DataType::Date),
362 Self::Timestamp(_) => Some(DataType::Timestamp),
363 Self::Interval { .. } => Some(DataType::Interval),
364 Self::Json(_) => Some(DataType::Json),
365 Self::Bytes(_) => Some(DataType::Bytes),
366 Self::TextArray(_) => Some(DataType::TextArray),
367 Self::IntArray(_) => Some(DataType::IntArray),
368 Self::BigIntArray(_) => Some(DataType::BigIntArray),
369 Self::TsVector(_) => Some(DataType::TsVector),
370 Self::TsQuery(_) => Some(DataType::TsQuery),
371 Self::Null => None,
372 }
373 }
374
375 pub const fn is_null(&self) -> bool {
376 matches!(self, Self::Null)
377 }
378}
379
380#[derive(Debug, Clone, PartialEq)]
383pub struct Row {
384 pub values: Vec<Value>,
385}
386
387impl Row {
388 pub const fn new(values: Vec<Value>) -> Self {
389 Self { values }
390 }
391
392 pub fn len(&self) -> usize {
393 self.values.len()
394 }
395
396 pub fn is_empty(&self) -> bool {
397 self.values.is_empty()
398 }
399}
400
401#[derive(Debug, Clone, PartialEq)]
402pub struct ColumnSchema {
403 pub name: String,
404 pub ty: DataType,
405 pub nullable: bool,
406 pub default: Option<Value>,
411 pub runtime_default: Option<String>,
419 pub auto_increment: bool,
423}
424
425#[derive(Debug, Clone, PartialEq)]
426pub struct TableSchema {
427 pub name: String,
428 pub columns: Vec<ColumnSchema>,
429 pub hot_tier_bytes: Option<u64>,
435 pub foreign_keys: Vec<ForeignKeyConstraint>,
442 pub uniqueness_constraints: Vec<UniquenessConstraint>,
449 pub checks: Vec<String>,
458}
459
460#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct UniquenessConstraint {
466 pub is_primary_key: bool,
471 pub columns: Vec<usize>,
475 pub nulls_not_distinct: bool,
482}
483
484#[derive(Debug, Clone, PartialEq, Eq)]
489pub struct ForeignKeyConstraint {
490 pub name: Option<String>,
494 pub local_columns: Vec<usize>,
497 pub parent_table: String,
499 pub parent_columns: Vec<usize>,
504 pub on_delete: FkAction,
506 pub on_update: FkAction,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
513pub enum FkAction {
514 Restrict,
515 Cascade,
516 SetNull,
517 SetDefault,
518 NoAction,
519}
520
521impl FkAction {
522 pub const fn tag(self) -> u8 {
524 match self {
525 Self::Restrict => 0,
526 Self::Cascade => 1,
527 Self::SetNull => 2,
528 Self::SetDefault => 3,
529 Self::NoAction => 4,
530 }
531 }
532 pub const fn from_tag(b: u8) -> Option<Self> {
533 Some(match b {
534 0 => Self::Restrict,
535 1 => Self::Cascade,
536 2 => Self::SetNull,
537 3 => Self::SetDefault,
538 4 => Self::NoAction,
539 _ => return None,
540 })
541 }
542}
543
544impl TableSchema {
545 pub fn column_position(&self, name: &str) -> Option<usize> {
546 self.columns.iter().position(|c| c.name == name)
547 }
548}
549
550#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
555pub enum IndexKey {
556 Int(i64),
557 Text(String),
558 Bool(bool),
559}
560
561impl IndexKey {
562 pub fn from_value(v: &Value) -> Option<Self> {
563 match v {
564 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
565 Value::Int(n) => Some(Self::Int(i64::from(*n))),
566 Value::BigInt(n) => Some(Self::Int(*n)),
567 Value::Text(s) => Some(Self::Text(s.clone())),
568 Value::Bool(b) => Some(Self::Bool(*b)),
569 Value::Date(d) => Some(Self::Int(i64::from(*d))),
572 Value::Timestamp(t) => Some(Self::Int(*t)),
573 Value::Null
578 | Value::Float(_)
579 | Value::Vector(_)
580 | Value::Sq8Vector(_)
581 | Value::HalfVector(_)
582 | Value::Numeric { .. }
583 | Value::Interval { .. }
584 | Value::Json(_)
585 | Value::Bytes(_)
586 | Value::TextArray(_)
587 | Value::IntArray(_)
588 | Value::BigIntArray(_)
589 | Value::TsVector(_)
590 | Value::TsQuery(_) => None,
591 }
592 }
593}
594
595#[derive(Debug, Clone)]
600pub struct Index {
601 pub name: String,
602 pub column_position: usize,
603 pub kind: IndexKind,
604 pub included_columns: Vec<usize>,
614 pub partial_predicate: Option<String>,
621 pub expression: Option<String>,
626 pub is_unique: bool,
633 pub extra_column_positions: Vec<usize>,
642}
643
644pub const NSW_DEFAULT_M: usize = 16;
647
648#[derive(Debug, Clone)]
656pub struct FreezeReport {
657 pub segment_id: u32,
660 pub frozen_rows: usize,
663 pub bytes_freed: u64,
667 pub segment_bytes: Vec<u8>,
672}
673
674#[derive(Debug, Clone)]
683pub struct FreezeSlice {
684 pub row_range: core::ops::Range<usize>,
689 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
695}
696
697#[derive(Debug, Clone)]
713pub struct CompactReport {
714 pub sources: Vec<u32>,
716 pub merged_segment_id: Option<u32>,
718 pub merged_segment_bytes: Vec<u8>,
720 pub merged_rows: usize,
722 pub deleted_rows_pruned: usize,
727 pub bytes_reclaimed_estimate: u64,
731}
732
733#[derive(Debug, Clone)]
734pub enum IndexKind {
735 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
752 Nsw(NswGraph),
754 Brin {
761 column_type: DataType,
765 },
766 Gin(PersistentBTreeMap<alloc::string::String, Vec<RowLocator>>),
782}
783
784#[derive(Debug, Clone)]
793pub struct NswGraph {
794 pub m: usize,
796 pub m_max_0: usize,
799 pub entry: Option<usize>,
802 pub entry_level: u8,
804 pub levels: PersistentVec<u8>,
811 pub layers: Vec<PersistentVec<Vec<u32>>>,
827}
828
829impl NswGraph {
830 fn new(m: usize) -> Self {
831 Self {
832 m,
833 m_max_0: m.saturating_mul(2),
834 entry: None,
835 entry_level: 0,
836 levels: PersistentVec::new(),
837 layers: alloc::vec![PersistentVec::new()],
838 }
839 }
840
841 pub const fn cap_for_layer(&self, layer: u8) -> usize {
843 if layer == 0 { self.m_max_0 } else { self.m }
844 }
845}
846
847#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
854 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
857 x ^= x >> 30;
858 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
859 x ^= x >> 27;
860 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
861 x ^= x >> 31;
862 let mut level: u8 = 0;
867 while x & 0xF == 0 && level < MAX_LEVEL {
868 level += 1;
869 x >>= 4;
870 }
871 level
872}
873
874impl Index {
875 fn new_btree(name: String, column_position: usize) -> Self {
876 Self {
877 name,
878 column_position,
879 kind: IndexKind::BTree(PersistentBTreeMap::new()),
880 included_columns: Vec::new(),
881 partial_predicate: None,
882 expression: None,
883 is_unique: false,
884 extra_column_positions: Vec::new(),
885 }
886 }
887
888 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
889 Self {
890 name,
891 column_position,
892 kind: IndexKind::Nsw(NswGraph::new(m)),
893 included_columns: Vec::new(),
894 partial_predicate: None,
895 expression: None,
896 is_unique: false,
897 extra_column_positions: Vec::new(),
898 }
899 }
900
901 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
905 Self {
906 name,
907 column_position,
908 kind: IndexKind::Brin { column_type },
909 included_columns: Vec::new(),
910 partial_predicate: None,
911 expression: None,
912 is_unique: false,
913 extra_column_positions: Vec::new(),
914 }
915 }
916
917 fn new_gin(name: String, column_position: usize) -> Self {
922 Self {
923 name,
924 column_position,
925 kind: IndexKind::Gin(PersistentBTreeMap::new()),
926 included_columns: Vec::new(),
927 partial_predicate: None,
928 expression: None,
929 is_unique: false,
930 extra_column_positions: Vec::new(),
931 }
932 }
933
934 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
943 match &self.kind {
944 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
945 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => &[][..],
948 }
949 }
950
951 pub fn gin_lookup_word(&self, word: &str) -> &[RowLocator] {
955 match &self.kind {
956 IndexKind::Gin(m) => m.get(&String::from(word)).map_or(&[][..], Vec::as_slice),
957 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
958 }
959 }
960
961 pub const fn nsw(&self) -> Option<&NswGraph> {
964 match &self.kind {
965 IndexKind::Nsw(g) => Some(g),
966 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
967 }
968 }
969
970 pub const fn is_brin(&self) -> bool {
975 matches!(self.kind, IndexKind::Brin { .. })
976 }
977
978 pub const fn is_gin(&self) -> bool {
982 matches!(self.kind, IndexKind::Gin(_))
983 }
984}
985
986#[derive(Debug, Clone)]
1002pub struct Table {
1003 schema: TableSchema,
1004 rows: PersistentVec<Row>,
1005 indices: Vec<Index>,
1006 hot_bytes: u64,
1007 cold_row_count: u64,
1021 cold_row_count_stale: bool,
1026}
1027
1028impl Table {
1029 pub fn new(schema: TableSchema) -> Self {
1030 Self {
1031 schema,
1032 rows: PersistentVec::new(),
1033 indices: Vec::new(),
1034 hot_bytes: 0,
1035 cold_row_count: 0,
1036 cold_row_count_stale: false,
1037 }
1038 }
1039
1040 #[must_use]
1044 pub const fn hot_bytes(&self) -> u64 {
1045 self.hot_bytes
1046 }
1047
1048 #[must_use]
1051 pub const fn cold_row_count(&self) -> u64 {
1052 self.cold_row_count
1053 }
1054
1055 pub fn set_cold_row_count(&mut self, n: u64) {
1058 self.cold_row_count = n;
1059 self.cold_row_count_stale = false;
1060 }
1061
1062 pub fn mark_cold_row_count_stale(&mut self) {
1067 self.cold_row_count_stale = true;
1068 }
1069
1070 #[must_use]
1074 pub const fn cold_row_count_stale(&self) -> bool {
1075 self.cold_row_count_stale
1076 }
1077
1078 #[must_use]
1089 pub fn count_cold_locators(&self) -> u64 {
1090 let mut best: u64 = 0;
1091 for idx in &self.indices {
1092 if let IndexKind::BTree(map) = &idx.kind {
1093 let n: u64 = map
1094 .iter()
1095 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
1096 .sum();
1097 if n > best {
1098 best = n;
1099 }
1100 }
1101 }
1102 best
1103 }
1104
1105 pub const fn schema(&self) -> &TableSchema {
1106 &self.schema
1107 }
1108
1109 pub const fn schema_mut(&mut self) -> &mut TableSchema {
1113 &mut self.schema
1114 }
1115
1116 pub const fn rows(&self) -> &PersistentVec<Row> {
1120 &self.rows
1121 }
1122
1123 pub const fn row_count(&self) -> usize {
1124 self.rows.len()
1125 }
1126
1127 pub fn indices_mut(&mut self) -> &mut [Index] {
1132 &mut self.indices
1133 }
1134
1135 pub fn indices(&self) -> &[Index] {
1136 &self.indices
1137 }
1138
1139 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1145 let ty = self.schema.columns.get(col_pos)?.ty;
1146 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1147 return None;
1148 }
1149 let mut max: Option<i64> = None;
1150 for row in &self.rows {
1151 match row.values.get(col_pos) {
1152 Some(Value::SmallInt(n)) => {
1153 let v = i64::from(*n);
1154 max = Some(max.map_or(v, |m| m.max(v)));
1155 }
1156 Some(Value::Int(n)) => {
1157 let v = i64::from(*n);
1158 max = Some(max.map_or(v, |m| m.max(v)));
1159 }
1160 Some(Value::BigInt(n)) => {
1161 max = Some(max.map_or(*n, |m| m.max(*n)));
1162 }
1163 _ => {}
1164 }
1165 }
1166 Some(max.map_or(1, |m| m + 1))
1167 }
1168
1169 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1173 self.indices
1180 .iter()
1181 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1182 .or_else(|| {
1183 self.indices.iter().find(|i| {
1184 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1185 })
1186 })
1187 }
1188
1189 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1193 if row.len() != self.schema.columns.len() {
1194 return Err(StorageError::ArityMismatch {
1195 expected: self.schema.columns.len(),
1196 actual: row.len(),
1197 });
1198 }
1199 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1200 if val.is_null() {
1201 if !col.nullable {
1202 return Err(StorageError::NullInNotNull {
1203 column: col.name.clone(),
1204 });
1205 }
1206 continue;
1207 }
1208 let actual = val.data_type().expect("non-null");
1209 let compatible = actual == col.ty
1223 || matches!(
1224 (actual, col.ty),
1225 (
1226 DataType::Text,
1227 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1228 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1229 | (DataType::Json, DataType::Jsonb)
1230 | (DataType::Jsonb, DataType::Json)
1231 | (DataType::Timestamp, DataType::Timestamptz)
1232 | (DataType::Timestamptz, DataType::Timestamp)
1233 )
1234 || matches!(
1235 (actual, col.ty),
1236 (
1237 DataType::Numeric { scale: a, .. },
1238 DataType::Numeric { scale: b, .. },
1239 ) if a == b
1240 );
1241 if !compatible {
1242 return Err(StorageError::TypeMismatch {
1243 column: col.name.clone(),
1244 expected: col.ty,
1245 actual,
1246 position: i,
1247 });
1248 }
1249 }
1250 let new_row_idx = self.rows.len();
1251 for idx in &mut self.indices {
1255 match &mut idx.kind {
1256 IndexKind::BTree(map) => {
1257 if let Some(key) = IndexKey::from_value(&row.values[idx.column_position]) {
1258 let mut entries = map.get(&key).cloned().unwrap_or_default();
1264 entries.push(RowLocator::Hot(new_row_idx));
1265 map.insert_mut(key, entries);
1266 }
1267 }
1268 IndexKind::Gin(map) => {
1269 if let Value::TsVector(lexemes) = &row.values[idx.column_position] {
1273 for lex in lexemes {
1274 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1275 entries.push(RowLocator::Hot(new_row_idx));
1276 map.insert_mut(lex.word.clone(), entries);
1277 }
1278 }
1279 }
1280 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {}
1284 }
1285 }
1286 self.hot_bytes = self
1289 .hot_bytes
1290 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1291 self.rows.push_mut(row);
1296 let new_row_idx = self.rows.len() - 1;
1299 let nsw_targets: Vec<usize> = self
1300 .indices
1301 .iter()
1302 .enumerate()
1303 .filter_map(|(i, idx)| {
1304 if matches!(idx.kind, IndexKind::Nsw(_)) {
1305 Some(i)
1306 } else {
1307 None
1308 }
1309 })
1310 .collect();
1311 for idx_pos in nsw_targets {
1312 nsw_insert_at(self, idx_pos, new_row_idx);
1313 }
1314 Ok(())
1315 }
1316
1317 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1321 if self.indices.iter().any(|i| i.name == name) {
1322 return Err(StorageError::DuplicateIndex { name });
1323 }
1324 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1325 StorageError::ColumnNotFound {
1326 column: column_name.into(),
1327 }
1328 })?;
1329 let mut idx = Index::new_btree(name, column_position);
1330 if let IndexKind::BTree(map) = &mut idx.kind {
1331 for (i, row) in self.rows.iter().enumerate() {
1332 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1333 let mut entries = map.get(&key).cloned().unwrap_or_default();
1334 entries.push(RowLocator::Hot(i));
1335 map.insert_mut(key, entries);
1336 }
1337 }
1338 }
1339 self.indices.push(idx);
1340 Ok(())
1341 }
1342
1343 pub fn add_nsw_index(
1348 &mut self,
1349 name: String,
1350 column_name: &str,
1351 m: usize,
1352 ) -> Result<(), StorageError> {
1353 self.add_nsw_index_inner(name, column_name, m, None)
1354 }
1355
1356 pub fn rebuild_nsw_index(
1368 &mut self,
1369 name: &str,
1370 new_encoding: Option<VecEncoding>,
1371 ) -> Result<(), StorageError> {
1372 let idx_pos = self
1373 .indices
1374 .iter()
1375 .position(|i| i.name == name)
1376 .ok_or_else(|| StorageError::IndexNotFound {
1377 name: String::from(name),
1378 })?;
1379 let col_pos = self.indices[idx_pos].column_position;
1380 let m = match &self.indices[idx_pos].kind {
1381 IndexKind::Nsw(g) => g.m,
1382 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1383 return Err(StorageError::Unsupported(format!(
1384 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1385 )));
1386 }
1387 };
1388 let col_name = self.schema.columns[col_pos].name.clone();
1389 if let Some(target) = new_encoding {
1392 let current = match self.schema.columns[col_pos].ty {
1393 DataType::Vector { encoding, .. } => encoding,
1394 ref other => {
1395 return Err(StorageError::Unsupported(format!(
1396 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1397 )));
1398 }
1399 };
1400 if target != current {
1401 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1402 unreachable!("checked above")
1403 };
1404 let n = self.rows.len();
1405 for i in 0..n {
1406 let row = self
1407 .rows
1408 .get_mut(i)
1409 .expect("row index in bounds (we iterated up to len())");
1410 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1411 let recoded = recode_vector_cell(cell, target)?;
1412 row.values[col_pos] = recoded;
1413 }
1414 self.schema.columns[col_pos].ty = DataType::Vector {
1415 dim,
1416 encoding: target,
1417 };
1418 }
1419 }
1420 self.indices.remove(idx_pos);
1422 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1423 Ok(())
1424 }
1425
1426 pub fn restore_nsw_index(
1431 &mut self,
1432 name: String,
1433 column_name: &str,
1434 graph: NswGraph,
1435 ) -> Result<(), StorageError> {
1436 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1437 }
1438
1439 pub fn restore_btree_index(
1446 &mut self,
1447 name: String,
1448 column_name: &str,
1449 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1450 ) -> Result<(), StorageError> {
1451 if self.indices.iter().any(|i| i.name == name) {
1452 return Err(StorageError::DuplicateIndex { name });
1453 }
1454 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1455 StorageError::ColumnNotFound {
1456 column: column_name.into(),
1457 }
1458 })?;
1459 self.indices.push(Index {
1460 name,
1461 column_position,
1462 kind: IndexKind::BTree(map),
1463 included_columns: Vec::new(),
1464 partial_predicate: None,
1465 expression: None,
1466 is_unique: false,
1467 extra_column_positions: Vec::new(),
1468 });
1469 Ok(())
1470 }
1471
1472 pub fn restore_brin_index(
1477 &mut self,
1478 name: String,
1479 column_name: &str,
1480 column_type: DataType,
1481 ) -> Result<(), StorageError> {
1482 if self.indices.iter().any(|i| i.name == name) {
1483 return Err(StorageError::DuplicateIndex { name });
1484 }
1485 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1486 StorageError::ColumnNotFound {
1487 column: column_name.into(),
1488 }
1489 })?;
1490 self.indices
1491 .push(Index::new_brin(name, column_position, column_type));
1492 Ok(())
1493 }
1494
1495 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1499 if self.indices.iter().any(|i| i.name == name) {
1500 return Err(StorageError::DuplicateIndex { name });
1501 }
1502 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1503 StorageError::ColumnNotFound {
1504 column: column_name.into(),
1505 }
1506 })?;
1507 let column_type = self.schema.columns[column_position].ty;
1508 self.indices
1509 .push(Index::new_brin(name, column_position, column_type));
1510 Ok(())
1511 }
1512
1513 pub fn add_gin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1518 if self.indices.iter().any(|i| i.name == name) {
1519 return Err(StorageError::DuplicateIndex { name });
1520 }
1521 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1522 StorageError::ColumnNotFound {
1523 column: column_name.into(),
1524 }
1525 })?;
1526 if self.schema.columns[column_position].ty != DataType::TsVector {
1527 return Err(StorageError::Corrupt(format!(
1528 "GIN index {name:?} requires a tsvector column; \
1529 {column_name:?} is {:?}",
1530 self.schema.columns[column_position].ty
1531 )));
1532 }
1533 let mut idx = Index::new_gin(name, column_position);
1534 if let IndexKind::Gin(map) = &mut idx.kind {
1535 for (i, row) in self.rows.iter().enumerate() {
1536 if let Value::TsVector(lexemes) = &row.values[column_position] {
1537 for lex in lexemes {
1538 let mut entries = map.get(&lex.word).cloned().unwrap_or_default();
1539 entries.push(RowLocator::Hot(i));
1540 map.insert_mut(lex.word.clone(), entries);
1541 }
1542 }
1543 }
1544 }
1545 self.indices.push(idx);
1546 Ok(())
1547 }
1548
1549 pub fn restore_gin_index(
1554 &mut self,
1555 name: String,
1556 column_name: &str,
1557 map: PersistentBTreeMap<String, Vec<RowLocator>>,
1558 ) -> Result<(), StorageError> {
1559 if self.indices.iter().any(|i| i.name == name) {
1560 return Err(StorageError::DuplicateIndex { name });
1561 }
1562 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1563 StorageError::ColumnNotFound {
1564 column: column_name.into(),
1565 }
1566 })?;
1567 let mut idx = Index::new_gin(name, column_position);
1568 idx.kind = IndexKind::Gin(map);
1569 self.indices.push(idx);
1570 Ok(())
1571 }
1572
1573 pub fn register_cold_locators<I>(
1590 &mut self,
1591 index_name: &str,
1592 locators: I,
1593 ) -> Result<usize, StorageError>
1594 where
1595 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1596 {
1597 let idx = self
1598 .indices
1599 .iter_mut()
1600 .find(|i| i.name == index_name)
1601 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1602 let map = match &mut idx.kind {
1603 IndexKind::BTree(map) => map,
1604 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1605 return Err(StorageError::Corrupt(format!(
1606 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1607 )));
1608 }
1609 };
1610 let mut count = 0usize;
1611 for (key, locator) in locators {
1612 let mut entries = map.get(&key).cloned().unwrap_or_default();
1613 entries.push(locator);
1614 map.insert_mut(key, entries);
1615 count += 1;
1616 }
1617 Ok(count)
1618 }
1619
1620 pub fn register_gin_cold_locators<I>(
1625 &mut self,
1626 index_name: &str,
1627 locators: I,
1628 ) -> Result<usize, StorageError>
1629 where
1630 I: IntoIterator<Item = (String, RowLocator)>,
1631 {
1632 let idx = self
1633 .indices
1634 .iter_mut()
1635 .find(|i| i.name == index_name)
1636 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1637 let map = match &mut idx.kind {
1638 IndexKind::Gin(map) => map,
1639 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1640 return Err(StorageError::Corrupt(format!(
1641 "register_gin_cold_locators: index {index_name:?} is not GIN"
1642 )));
1643 }
1644 };
1645 let mut count = 0usize;
1646 for (word, locator) in locators {
1647 let mut entries = map.get(&word).cloned().unwrap_or_default();
1648 entries.push(locator);
1649 map.insert_mut(word, entries);
1650 count += 1;
1651 }
1652 Ok(count)
1653 }
1654
1655 pub fn remove_cold_locators_for_key(
1665 &mut self,
1666 index_name: &str,
1667 key: &IndexKey,
1668 ) -> Result<usize, StorageError> {
1669 let idx = self
1670 .indices
1671 .iter_mut()
1672 .find(|i| i.name == index_name)
1673 .ok_or_else(|| {
1674 StorageError::Corrupt(format!(
1675 "remove_cold_locators_for_key: index {index_name:?} not found"
1676 ))
1677 })?;
1678 let map = match &mut idx.kind {
1679 IndexKind::BTree(map) => map,
1680 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
1681 return Err(StorageError::Corrupt(format!(
1682 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1683 cold locators apply only to BTree indices"
1684 )));
1685 }
1686 };
1687 let Some(entries) = map.get(key) else {
1688 return Ok(0);
1689 };
1690 let mut kept: Vec<RowLocator> =
1691 entries.iter().copied().filter(RowLocator::is_hot).collect();
1692 let removed = entries.len() - kept.len();
1693 if removed == 0 {
1694 return Ok(0);
1695 }
1696 kept.shrink_to_fit();
1697 map.insert_mut(key.clone(), kept);
1705 Ok(removed)
1706 }
1707
1708 pub fn add_column(&mut self, col: ColumnSchema, fill_value: Value) {
1715 self.schema.columns.push(col);
1716 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1717 for row in self.rows.iter() {
1718 let mut values = row.values.clone();
1719 values.push(fill_value.clone());
1720 new_rows.push_mut(Row::new(values));
1721 }
1722 self.rows = new_rows;
1723 }
1724
1725 pub fn drop_column(&mut self, col_pos: usize) {
1735 debug_assert!(col_pos < self.schema.columns.len());
1736 self.schema.columns.remove(col_pos);
1738 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1740 for row in self.rows.iter() {
1741 let mut values = row.values.clone();
1742 if col_pos < values.len() {
1743 values.remove(col_pos);
1744 }
1745 new_rows.push_mut(Row::new(values));
1746 }
1747 self.rows = new_rows;
1748 self.indices.retain(|idx| idx.column_position != col_pos);
1750 for idx in &mut self.indices {
1751 if idx.column_position > col_pos {
1752 idx.column_position -= 1;
1753 }
1754 for inc in &mut idx.included_columns {
1756 if *inc as usize > col_pos {
1757 *inc -= 1;
1758 }
1759 }
1760 }
1761 let mut surviving_ucs: Vec<UniquenessConstraint> = Vec::new();
1766 for mut uc in core::mem::take(&mut self.schema.uniqueness_constraints) {
1767 uc.columns.retain(|&c| c != col_pos);
1768 if uc.columns.is_empty() {
1769 continue;
1770 }
1771 for c in &mut uc.columns {
1772 if *c > col_pos {
1773 *c -= 1;
1774 }
1775 }
1776 surviving_ucs.push(uc);
1777 }
1778 self.schema.uniqueness_constraints = surviving_ucs;
1779 for fk in &mut self.schema.foreign_keys {
1782 for c in &mut fk.local_columns {
1783 if *c > col_pos {
1784 *c -= 1;
1785 }
1786 }
1787 }
1788 self.rebuild_indices();
1795 }
1796
1797 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1803 if positions.is_empty() {
1804 return 0;
1805 }
1806 let mut to_remove = alloc::vec![false; self.rows.len()];
1810 let mut removed = 0;
1811 for &p in positions {
1812 if p < to_remove.len() && !to_remove[p] {
1813 to_remove[p] = true;
1814 removed += 1;
1815 }
1816 }
1817 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1818 let mut removed_bytes: u64 = 0;
1819 for (i, row) in self.rows.iter().enumerate() {
1820 if to_remove[i] {
1821 removed_bytes =
1822 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1823 } else {
1824 new_rows.push_mut(row.clone());
1825 }
1826 }
1827 self.rows = new_rows;
1828 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1829 self.rebuild_indices();
1830 removed
1831 }
1832
1833 pub fn update_row(
1839 &mut self,
1840 position: usize,
1841 new_values: Vec<Value>,
1842 ) -> Result<(), StorageError> {
1843 if position >= self.rows.len() {
1844 return Err(StorageError::Corrupt(alloc::format!(
1845 "update_row: position {position} out of bounds (rows={})",
1846 self.rows.len()
1847 )));
1848 }
1849 if new_values.len() != self.schema.columns.len() {
1850 return Err(StorageError::ArityMismatch {
1851 expected: self.schema.columns.len(),
1852 actual: new_values.len(),
1853 });
1854 }
1855 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1859 if val.is_null() {
1860 if !col.nullable {
1861 return Err(StorageError::NullInNotNull {
1862 column: col.name.clone(),
1863 });
1864 }
1865 continue;
1866 }
1867 let actual = val.data_type().expect("non-null");
1868 let compatible = actual == col.ty
1869 || matches!(
1870 (actual, col.ty),
1871 (
1872 DataType::Text,
1873 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1874 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1875 | (DataType::Json, DataType::Jsonb)
1876 | (DataType::Jsonb, DataType::Json)
1877 | (DataType::Timestamp, DataType::Timestamptz)
1878 | (DataType::Timestamptz, DataType::Timestamp)
1879 )
1880 || matches!(
1881 (actual, col.ty),
1882 (
1883 DataType::Numeric { scale: a, .. },
1884 DataType::Numeric { scale: b, .. },
1885 ) if a == b
1886 );
1887 if !compatible {
1888 return Err(StorageError::TypeMismatch {
1889 column: col.name.clone(),
1890 expected: col.ty,
1891 actual,
1892 position: i,
1893 });
1894 }
1895 }
1896 let old_row = self
1897 .rows
1898 .get(position)
1899 .expect("position bounds-checked above");
1900 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1901 let new_row = Row::new(new_values);
1902 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1903 self.rows = self
1904 .rows
1905 .set(position, new_row)
1906 .expect("position bounds-checked above");
1907 self.hot_bytes = self
1908 .hot_bytes
1909 .saturating_sub(old_bytes)
1910 .saturating_add(new_bytes);
1911 self.rebuild_indices();
1912 Ok(())
1913 }
1914
1915 fn rebuild_indices(&mut self) {
1922 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1931 .indices
1932 .iter()
1933 .filter_map(|idx| match &idx.kind {
1934 IndexKind::BTree(map) => {
1935 let cold: Vec<(IndexKey, RowLocator)> = map
1936 .iter()
1937 .flat_map(|(k, locs)| {
1938 locs.iter()
1939 .filter(|l| l.is_cold())
1940 .copied()
1941 .map(move |l| (k.clone(), l))
1942 })
1943 .collect();
1944 if cold.is_empty() {
1945 None
1946 } else {
1947 Some((idx.name.clone(), cold))
1948 }
1949 }
1950 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => None,
1953 })
1954 .collect();
1955
1956 let preserved_gin_cold: Vec<(String, Vec<(String, RowLocator)>)> = self
1961 .indices
1962 .iter()
1963 .filter_map(|idx| match &idx.kind {
1964 IndexKind::Gin(map) => {
1965 let cold: Vec<(String, RowLocator)> = map
1966 .iter()
1967 .flat_map(|(w, locs)| {
1968 locs.iter()
1969 .filter(|l| l.is_cold())
1970 .copied()
1971 .map(move |l| (w.clone(), l))
1972 })
1973 .collect();
1974 if cold.is_empty() {
1975 None
1976 } else {
1977 Some((idx.name.clone(), cold))
1978 }
1979 }
1980 IndexKind::BTree(_) | IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1981 })
1982 .collect();
1983
1984 #[derive(Clone)]
1989 enum RebuildKind {
1990 BTree,
1991 Nsw(usize),
1992 Brin(DataType),
1993 Gin,
1994 }
1995 let descriptors: Vec<(String, usize, RebuildKind)> = self
1996 .indices
1997 .iter()
1998 .map(|idx| {
1999 let kind = match &idx.kind {
2000 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
2001 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
2002 IndexKind::BTree(_) => RebuildKind::BTree,
2003 IndexKind::Gin(_) => RebuildKind::Gin,
2004 };
2005 (idx.name.clone(), idx.column_position, kind)
2006 })
2007 .collect();
2008 self.indices.clear();
2009 for (name, column_position, rebuild_kind) in descriptors {
2010 match rebuild_kind {
2011 RebuildKind::Nsw(m) => {
2012 let idx = Index::new_nsw(name, column_position, m);
2013 self.indices.push(idx);
2014 let idx_pos = self.indices.len() - 1;
2015 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2016 for row_idx in row_indices {
2017 nsw_insert_at(self, idx_pos, row_idx);
2018 }
2019 }
2020 RebuildKind::Brin(column_type) => {
2021 self.indices
2024 .push(Index::new_brin(name, column_position, column_type));
2025 }
2026 RebuildKind::BTree => {
2027 let mut idx = Index::new_btree(name, column_position);
2028 if let IndexKind::BTree(map) = &mut idx.kind {
2029 for (i, row) in self.rows.iter().enumerate() {
2030 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
2031 let mut entries = map.get(&key).cloned().unwrap_or_default();
2032 entries.push(RowLocator::Hot(i));
2033 map.insert_mut(key, entries);
2034 }
2035 }
2036 }
2037 self.indices.push(idx);
2038 }
2039 RebuildKind::Gin => {
2040 let mut idx = Index::new_gin(name, column_position);
2041 if let IndexKind::Gin(map) = &mut idx.kind {
2042 for (i, row) in self.rows.iter().enumerate() {
2043 if let Value::TsVector(lexemes) = &row.values[column_position] {
2044 for lex in lexemes {
2045 let mut entries =
2046 map.get(&lex.word).cloned().unwrap_or_default();
2047 entries.push(RowLocator::Hot(i));
2048 map.insert_mut(lex.word.clone(), entries);
2049 }
2050 }
2051 }
2052 }
2053 self.indices.push(idx);
2054 }
2055 }
2056 }
2057
2058 for (idx_name, locators) in preserved_cold {
2063 let _ = self.register_cold_locators(&idx_name, locators);
2067 }
2068 for (idx_name, locators) in preserved_gin_cold {
2070 let _ = self.register_gin_cold_locators(&idx_name, locators);
2071 }
2072 }
2073
2074 fn add_nsw_index_inner(
2075 &mut self,
2076 name: String,
2077 column_name: &str,
2078 m: usize,
2079 restore: Option<NswGraph>,
2080 ) -> Result<(), StorageError> {
2081 if self.indices.iter().any(|i| i.name == name) {
2082 return Err(StorageError::DuplicateIndex { name });
2083 }
2084 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
2085 StorageError::ColumnNotFound {
2086 column: column_name.into(),
2087 }
2088 })?;
2089 if !matches!(
2090 self.schema.columns[column_position].ty,
2091 DataType::Vector { .. }
2092 ) {
2093 return Err(StorageError::TypeMismatch {
2094 column: column_name.into(),
2095 expected: DataType::Vector {
2096 dim: 0,
2097 encoding: VecEncoding::F32,
2098 },
2099 actual: self.schema.columns[column_position].ty,
2100 position: column_position,
2101 });
2102 }
2103 if let Some(graph) = restore {
2104 self.indices.push(Index {
2105 name,
2106 column_position,
2107 kind: IndexKind::Nsw(graph),
2108 included_columns: Vec::new(),
2109 partial_predicate: None,
2110 expression: None,
2111 is_unique: false,
2112 extra_column_positions: Vec::new(),
2113 });
2114 return Ok(());
2115 }
2116 let idx = Index::new_nsw(name, column_position, m);
2117 self.indices.push(idx);
2118 let idx_pos = self.indices.len() - 1;
2119 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
2122 for row_idx in row_indices {
2123 nsw_insert_at(self, idx_pos, row_idx);
2124 }
2125 Ok(())
2126 }
2127}
2128
2129fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
2136 if matches!(cell, Value::Null) {
2137 return Ok(cell);
2138 }
2139 let as_f32: Vec<f32> = match &cell {
2141 Value::Vector(v) => v.clone(),
2142 Value::Sq8Vector(q) => quantize::dequantize(q),
2143 Value::HalfVector(h) => h.to_f32_vec(),
2144 other => {
2145 return Err(StorageError::Unsupported(format!(
2146 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
2147 other.data_type()
2148 )));
2149 }
2150 };
2151 Ok(match target {
2156 VecEncoding::F32 => Value::Vector(as_f32),
2157 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
2158 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
2159 })
2160}
2161
2162fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
2169 let col_pos = table.indices[idx_pos].column_position;
2170 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
2171 Value::Vector(v) => Some(v.len()),
2172 Value::Sq8Vector(q) => Some(q.bytes.len()),
2173 Value::HalfVector(h) => Some(h.dim()),
2174 _ => None,
2175 };
2176 let Some(dim) = cell_dim else {
2177 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2180 return;
2181 };
2182 if dim == 0 {
2183 ensure_node_slot(table, idx_pos, new_row_idx, 0);
2184 return;
2185 }
2186 let level = nsw_assign_level(new_row_idx);
2187 ensure_node_slot(table, idx_pos, new_row_idx, level);
2188 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
2189 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
2190 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2191 unreachable!("nsw_insert_at on a non-NSW index")
2192 }
2193 };
2194 if entry.is_none() {
2196 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2197 g.entry = Some(new_row_idx);
2198 g.entry_level = level;
2199 *g.levels
2200 .get_mut(new_row_idx)
2201 .expect("levels slot padded by ensure_node_slot") = level;
2202 }
2203 return;
2204 }
2205 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2207 *g.levels
2208 .get_mut(new_row_idx)
2209 .expect("levels slot padded by ensure_node_slot") = level;
2210 }
2211 let query = match &table.rows[new_row_idx].values[col_pos] {
2212 Value::Vector(v) => v.clone(),
2213 Value::Sq8Vector(q) => quantize::dequantize(q),
2219 Value::HalfVector(h) => h.to_f32_vec(),
2222 _ => return,
2223 };
2224 let mut current = entry.expect("entry was Some above");
2227 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
2228 if entry_level > level {
2229 for layer in (level + 1..=entry_level).rev() {
2230 (current, current_d) =
2231 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
2232 }
2233 }
2234 let top = level.min(entry_level);
2238 let ef = (m * 2).max(8);
2239 for layer in (0..=top).rev() {
2240 let cap = if layer == 0 { m * 2 } else { m };
2241 let mut candidates = layer_beam_search(
2242 table,
2243 idx_pos,
2244 layer,
2245 current,
2246 current_d,
2247 &query,
2248 ef,
2249 NswMetric::L2,
2250 );
2251 candidates.retain(|&(_, n)| n != new_row_idx);
2252 if let Some(&(d, n)) = candidates.first() {
2255 current = n;
2256 current_d = d;
2257 }
2258 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
2259 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
2260 }
2261 if level > entry_level
2264 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2265 {
2266 g.entry = Some(new_row_idx);
2267 g.entry_level = level;
2268 }
2269}
2270
2271fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
2275 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
2276 unreachable!("ensure_node_slot on a BTree index");
2277 };
2278 while g.layers.len() <= level as usize {
2279 g.layers.push(PersistentVec::new());
2280 }
2281 while g.levels.len() <= new_row_idx {
2282 g.levels.push_mut(0);
2283 }
2284 for layer_vec in &mut g.layers {
2285 while layer_vec.len() <= new_row_idx {
2286 layer_vec.push_mut(Vec::new());
2287 }
2288 }
2289}
2290
2291fn greedy_layer_walk(
2297 table: &Table,
2298 idx_pos: usize,
2299 layer: u8,
2300 mut current: usize,
2301 mut current_d: f32,
2302 query: &[f32],
2303) -> (usize, f32) {
2304 let g = match &table.indices[idx_pos].kind {
2305 IndexKind::Nsw(g) => g,
2306 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
2307 return (current, current_d);
2308 }
2309 };
2310 let col_pos = table.indices[idx_pos].column_position;
2311 loop {
2312 let neighbours: &[u32] = g
2313 .layers
2314 .get(layer as usize)
2315 .and_then(|layer_v| layer_v.get(current))
2316 .map_or(&[][..], Vec::as_slice);
2317 let mut best = current;
2318 let mut best_d = current_d;
2319 for &n in neighbours {
2320 let n = n as usize;
2321 let d = vec_l2_sq(table, col_pos, n, query);
2322 if d < best_d {
2323 best = n;
2324 best_d = d;
2325 }
2326 }
2327 if best == current {
2328 return (current, current_d);
2329 }
2330 current = best;
2331 current_d = best_d;
2332 }
2333}
2334
2335#[allow(clippy::too_many_arguments)] fn layer_beam_search(
2348 table: &Table,
2349 idx_pos: usize,
2350 layer: u8,
2351 entry_node: usize,
2352 entry_d: f32,
2353 query: &[f32],
2354 ef: usize,
2355 metric: NswMetric,
2356) -> Vec<(f32, usize)> {
2357 let g = match &table.indices[idx_pos].kind {
2358 IndexKind::Nsw(g) => g,
2359 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2360 };
2361 let col_pos = table.indices[idx_pos].column_position;
2362 let d0 = if matches!(metric, NswMetric::L2) {
2363 entry_d
2364 } else {
2365 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
2366 };
2367 let row_count = table.rows.len();
2368 let mut visited: Vec<bool> = alloc::vec![false; row_count];
2369 if entry_node < row_count {
2370 visited[entry_node] = true;
2371 }
2372 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
2375 alloc::collections::BinaryHeap::with_capacity(ef);
2376 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
2377 alloc::collections::BinaryHeap::with_capacity(ef);
2378 candidates.push(NodeClosest {
2379 dist: d0,
2380 node: entry_node,
2381 });
2382 results.push(NodeFurthest {
2383 dist: d0,
2384 node: entry_node,
2385 });
2386 while let Some(cur) = candidates.pop() {
2387 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2388 if cur.dist > worst && results.len() >= ef {
2389 break;
2390 }
2391 let neighbours: &[u32] = g
2392 .layers
2393 .get(layer as usize)
2394 .and_then(|layer_v| layer_v.get(cur.node))
2395 .map_or(&[][..], Vec::as_slice);
2396 for &n in neighbours {
2397 let n = n as usize;
2398 if n >= row_count || visited[n] {
2399 continue;
2400 }
2401 visited[n] = true;
2402 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2406 if !dn.is_finite() {
2407 continue;
2408 }
2409 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2410 if results.len() < ef || dn < worst {
2411 results.push(NodeFurthest { dist: dn, node: n });
2412 if results.len() > ef {
2413 results.pop();
2414 }
2415 candidates.push(NodeClosest { dist: dn, node: n });
2416 }
2417 }
2418 }
2419 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2422 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2423 out
2424}
2425
2426#[derive(Debug, Clone, Copy)]
2430struct NodeClosest {
2431 dist: f32,
2432 node: usize,
2433}
2434impl PartialEq for NodeClosest {
2435 fn eq(&self, other: &Self) -> bool {
2436 self.dist == other.dist && self.node == other.node
2437 }
2438}
2439impl Eq for NodeClosest {}
2440impl PartialOrd for NodeClosest {
2441 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2442 Some(self.cmp(other))
2443 }
2444}
2445impl Ord for NodeClosest {
2446 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2447 other
2449 .dist
2450 .partial_cmp(&self.dist)
2451 .unwrap_or(core::cmp::Ordering::Equal)
2452 }
2453}
2454
2455#[derive(Debug, Clone, Copy)]
2458struct NodeFurthest {
2459 dist: f32,
2460 node: usize,
2461}
2462impl PartialEq for NodeFurthest {
2463 fn eq(&self, other: &Self) -> bool {
2464 self.dist == other.dist && self.node == other.node
2465 }
2466}
2467impl Eq for NodeFurthest {}
2468impl PartialOrd for NodeFurthest {
2469 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2470 Some(self.cmp(other))
2471 }
2472}
2473impl Ord for NodeFurthest {
2474 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2475 self.dist
2476 .partial_cmp(&other.dist)
2477 .unwrap_or(core::cmp::Ordering::Equal)
2478 }
2479}
2480
2481fn select_neighbours_heuristic(
2490 candidates: &[(f32, usize)],
2491 m: usize,
2492 table: &Table,
2493 col_pos: usize,
2494) -> Vec<usize> {
2495 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2496 for &(d_q, e) in candidates {
2497 if chosen.len() >= m {
2498 break;
2499 }
2500 if !matches!(
2505 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2506 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2507 ) {
2508 continue;
2509 }
2510 let mut covered = false;
2511 for &r in &chosen {
2512 if cell_l2_sq(table, col_pos, e, r) < d_q {
2516 covered = true;
2517 break;
2518 }
2519 }
2520 if !covered {
2521 chosen.push(e);
2522 }
2523 }
2524 chosen
2525}
2526
2527fn connect_at_layer(
2531 table: &mut Table,
2532 idx_pos: usize,
2533 layer: u8,
2534 new_row_idx: usize,
2535 peers: &[usize],
2536) {
2537 let col_pos = table.indices[idx_pos].column_position;
2538 let cap = match &table.indices[idx_pos].kind {
2539 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2540 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return,
2541 };
2542 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2547 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2548 let layer_v = &mut g.layers[layer as usize];
2549 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2550 *slot = peers
2551 .iter()
2552 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2553 .collect();
2554 }
2555 }
2556 for &peer in peers {
2557 if !matches!(
2561 &table.rows[peer].values[col_pos],
2562 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2563 ) {
2564 continue;
2565 }
2566 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2568 let layer_v = &mut g.layers[layer as usize];
2569 if let Some(slot) = layer_v.get_mut(peer)
2570 && !slot.contains(&new_row_u32)
2571 {
2572 slot.push(new_row_u32);
2573 }
2574 }
2575 let needs_trim = match &table.indices[idx_pos].kind {
2579 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2580 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => false,
2581 };
2582 if needs_trim {
2583 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2584 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2585 .iter()
2586 .map(|&n| n as usize)
2587 .collect(),
2588 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => continue,
2589 };
2590 let mut tagged: Vec<(f32, usize)> = current_peers
2595 .iter()
2596 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2597 .collect();
2598 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2599 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2600 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2601 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2602 {
2603 *slot = kept
2604 .into_iter()
2605 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2606 .collect();
2607 }
2608 }
2609 }
2610}
2611
2612fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2619 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2620 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2621 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2622 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2623 }
2624 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2628 halfvec::half_l2_distance_sq_asymmetric(h, query)
2629 }
2630 _ => f32::INFINITY,
2631 }
2632}
2633
2634fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2641 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2642 return f32::INFINITY;
2643 };
2644 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2645 return f32::INFINITY;
2646 };
2647 match (cell_a, cell_b) {
2648 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2649 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2650 quantize::sq8_l2_distance_sq(a, b)
2651 }
2652 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2657 halfvec::half_l2_distance_sq(a, b)
2658 }
2659 _ => f32::INFINITY,
2660 }
2661}
2662
2663fn cell_to_query_metric_distance(
2668 table: &Table,
2669 col_pos: usize,
2670 row: usize,
2671 query: &[f32],
2672 metric: NswMetric,
2673) -> f32 {
2674 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2675 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2676 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2677 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2678 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2679 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2680 },
2681 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2684 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2685 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2686 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2687 },
2688 _ => f32::INFINITY,
2689 }
2690}
2691
2692#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2698pub enum NswMetric {
2699 L2,
2702 InnerProduct,
2705 Cosine,
2708}
2709
2710fn nsw_search(
2716 table: &Table,
2717 idx_pos: usize,
2718 query: &[f32],
2719 k: usize,
2720 ef: usize,
2721 metric: NswMetric,
2722) -> Vec<(f32, usize)> {
2723 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2724 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2725 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => return Vec::new(),
2726 };
2727 let Some(entry) = entry else {
2728 return Vec::new();
2729 };
2730 let col_pos = table.indices[idx_pos].column_position;
2731 let sq8 = matches!(
2738 table.schema.columns.get(col_pos).map(|c| c.ty),
2739 Some(DataType::Vector {
2740 encoding: VecEncoding::Sq8,
2741 ..
2742 })
2743 );
2744 let ef = if sq8 {
2745 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2746 } else {
2747 ef.max(k)
2748 };
2749 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2751 let mut current = entry;
2752 let mut current_d = entry_d;
2753 for layer in (1..=entry_level).rev() {
2754 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2755 }
2756 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2758 if sq8 {
2759 results = sq8_rerank(table, col_pos, &results, query, metric);
2760 }
2761 results.truncate(k);
2762 results
2763}
2764
2765fn sq8_rerank(
2772 table: &Table,
2773 col_pos: usize,
2774 candidates: &[(f32, usize)],
2775 query: &[f32],
2776 metric: NswMetric,
2777) -> Vec<(f32, usize)> {
2778 let mut out: Vec<(f32, usize)> = candidates
2779 .iter()
2780 .filter_map(|&(adc_d, row)| {
2781 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2782 let Value::Sq8Vector(q) = cell else {
2783 return Some((adc_d, row));
2787 };
2788 let deq = quantize::dequantize(q);
2789 if deq.len() != query.len() {
2790 return None;
2791 }
2792 Some((metric_distance(metric, &deq, query), row))
2793 })
2794 .collect();
2795 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2796 out
2797}
2798
2799const SQ8_RERANK_OVER_FETCH: usize = 3;
2803
2804fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2805 match metric {
2806 NswMetric::L2 => l2_distance_sq(a, b),
2807 NswMetric::InnerProduct => -inner_product_f32(a, b),
2808 NswMetric::Cosine => {
2809 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2810 if na == 0.0 || nb == 0.0 {
2811 return f32::INFINITY;
2812 }
2813 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2816 1.0 - dot / denom
2817 }
2818 }
2819}
2820
2821#[doc(hidden)]
2830#[inline]
2831pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2832 #[cfg(target_arch = "aarch64")]
2833 {
2834 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2835 return unsafe { inner_product_neon(a, b) };
2838 }
2839 }
2840 inner_product_scalar(a, b)
2841}
2842
2843fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2844 let mut dot: f32 = 0.0;
2845 for (x, y) in a.iter().zip(b.iter()) {
2846 dot += x * y;
2847 }
2848 dot
2849}
2850
2851#[cfg(target_arch = "aarch64")]
2852#[target_feature(enable = "neon")]
2853#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2855 use core::arch::aarch64::{
2856 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2857 };
2858 unsafe {
2859 let zero: float32x4_t = vdupq_n_f32(0.0);
2862 let mut acc0 = zero;
2863 let mut acc1 = zero;
2864 let n = a.len();
2865 let mut i = 0usize;
2866 while i + 8 <= n {
2867 let av0 = vld1q_f32(a.as_ptr().add(i));
2868 let bv0 = vld1q_f32(b.as_ptr().add(i));
2869 acc0 = vfmaq_f32(acc0, av0, bv0);
2870 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2871 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2872 acc1 = vfmaq_f32(acc1, av1, bv1);
2873 i += 8;
2874 }
2875 while i + 4 <= n {
2876 let av = vld1q_f32(a.as_ptr().add(i));
2877 let bv = vld1q_f32(b.as_ptr().add(i));
2878 acc0 = vfmaq_f32(acc0, av, bv);
2879 i += 4;
2880 }
2881 vaddvq_f32(vaddq_f32(acc0, acc1))
2882 }
2883}
2884
2885#[doc(hidden)]
2892#[inline]
2893pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2894 #[cfg(target_arch = "aarch64")]
2895 {
2896 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2897 return unsafe { cosine_dot_norms_neon(a, b) };
2899 }
2900 }
2901 cosine_dot_norms_scalar(a, b)
2902}
2903
2904fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2905 let mut dot: f32 = 0.0;
2906 let mut na: f32 = 0.0;
2907 let mut nb: f32 = 0.0;
2908 for (x, y) in a.iter().zip(b.iter()) {
2909 dot += x * y;
2910 na += x * x;
2911 nb += y * y;
2912 }
2913 (dot, na, nb)
2914}
2915
2916#[cfg(target_arch = "aarch64")]
2917#[target_feature(enable = "neon")]
2918#[allow(clippy::many_single_char_names, clippy::similar_names)]
2919unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2920 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2921 unsafe {
2922 let zero: float32x4_t = vdupq_n_f32(0.0);
2923 let mut acc_dot = zero;
2924 let mut acc_na = zero;
2925 let mut acc_nb = zero;
2926 let n = a.len();
2927 let mut i = 0usize;
2928 while i + 4 <= n {
2929 let av = vld1q_f32(a.as_ptr().add(i));
2930 let bv = vld1q_f32(b.as_ptr().add(i));
2931 acc_dot = vfmaq_f32(acc_dot, av, bv);
2932 acc_na = vfmaq_f32(acc_na, av, av);
2933 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2934 i += 4;
2935 }
2936 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2937 }
2938}
2939
2940fn sqrt_newton_f32(x: f32) -> f32 {
2941 if x <= 0.0 {
2942 return 0.0;
2943 }
2944 let mut g = x;
2945 for _ in 0..10 {
2946 g = 0.5 * (g + x / g);
2947 }
2948 g
2949}
2950
2951#[inline]
2959fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2960 #[cfg(target_arch = "aarch64")]
2961 {
2962 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2963 return unsafe { l2_distance_sq_neon(a, b) };
2967 }
2968 }
2969 l2_distance_sq_scalar(a, b)
2970}
2971
2972fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2973 let mut sum: f32 = 0.0;
2974 for (x, y) in a.iter().zip(b.iter()) {
2975 let d = *x - *y;
2976 sum += d * d;
2977 }
2978 sum
2979}
2980
2981#[cfg(target_arch = "aarch64")]
2982#[target_feature(enable = "neon")]
2983#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2985 use core::arch::aarch64::{
2986 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2987 };
2988 unsafe {
2989 let zero: float32x4_t = vdupq_n_f32(0.0);
2994 let mut acc0 = zero;
2995 let mut acc1 = zero;
2996 let n = a.len();
2997 let mut i = 0usize;
2998 while i + 8 <= n {
3001 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3002 acc0 = vfmaq_f32(acc0, d0, d0);
3003 let d1 = vsubq_f32(
3004 vld1q_f32(a.as_ptr().add(i + 4)),
3005 vld1q_f32(b.as_ptr().add(i + 4)),
3006 );
3007 acc1 = vfmaq_f32(acc1, d1, d1);
3008 i += 8;
3009 }
3010 while i + 4 <= n {
3011 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
3012 acc0 = vfmaq_f32(acc0, d, d);
3013 i += 4;
3014 }
3015 vaddvq_f32(vaddq_f32(acc0, acc1))
3016 }
3017}
3018
3019pub fn nsw_query(
3022 table: &Table,
3023 idx_name: &str,
3024 query: &[f32],
3025 k: usize,
3026 metric: NswMetric,
3027) -> Vec<usize> {
3028 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
3029 return Vec::new();
3030 };
3031 let ef = (k * 2).max(NSW_DEFAULT_M);
3032 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
3033 hits.truncate(k);
3034 hits.into_iter().map(|(_, idx)| idx).collect()
3035}
3036
3037pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
3041 table
3042 .indices
3043 .iter()
3044 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
3045}
3046
3047#[derive(Debug, Clone, Default)]
3059pub struct Catalog {
3060 tables: Vec<Table>,
3061 by_name: BTreeMap<String, usize>,
3064 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
3086 functions: BTreeMap<String, FunctionDef>,
3093 triggers: Vec<TriggerDef>,
3098}
3099
3100#[derive(Debug, Clone, PartialEq, Eq)]
3106pub struct FunctionDef {
3107 pub name: String,
3108 pub args_repr: String,
3112 pub returns: String,
3117 pub language: String,
3119 pub body: String,
3124}
3125
3126#[derive(Debug, Clone, PartialEq, Eq)]
3130pub struct TriggerDef {
3131 pub name: String,
3132 pub table: String,
3134 pub timing: String,
3138 pub events: Vec<String>,
3141 pub for_each: String,
3145 pub function: String,
3147 pub update_columns: Vec<String>,
3154}
3155
3156impl Catalog {
3157 pub const fn new() -> Self {
3158 Self {
3159 tables: Vec::new(),
3160 by_name: BTreeMap::new(),
3161 cold_segments: Vec::new(),
3162 functions: BTreeMap::new(),
3163 triggers: Vec::new(),
3164 }
3165 }
3166
3167 pub const fn functions(&self) -> &BTreeMap<String, FunctionDef> {
3171 &self.functions
3172 }
3173
3174 pub fn create_function(
3178 &mut self,
3179 def: FunctionDef,
3180 or_replace: bool,
3181 ) -> Result<(), StorageError> {
3182 if !or_replace && self.functions.contains_key(&def.name) {
3183 return Err(StorageError::Corrupt(format!(
3184 "function {:?} already exists (drop or use CREATE OR REPLACE)",
3185 def.name
3186 )));
3187 }
3188 self.functions.insert(def.name.clone(), def);
3189 Ok(())
3190 }
3191
3192 pub fn drop_function(&mut self, name: &str) -> bool {
3196 self.functions.remove(name).is_some()
3197 }
3198
3199 pub fn triggers(&self) -> &[TriggerDef] {
3203 &self.triggers
3204 }
3205
3206 pub fn create_trigger(
3212 &mut self,
3213 def: TriggerDef,
3214 or_replace: bool,
3215 ) -> Result<(), StorageError> {
3216 if !self.by_name.contains_key(&def.table) {
3217 return Err(StorageError::TableNotFound {
3218 name: def.table.clone(),
3219 });
3220 }
3221 if !self.functions.contains_key(&def.function) {
3222 return Err(StorageError::Corrupt(format!(
3223 "trigger {:?} references unknown function {:?}",
3224 def.name, def.function
3225 )));
3226 }
3227 let dup = self
3228 .triggers
3229 .iter()
3230 .position(|t| t.name == def.name && t.table == def.table);
3231 match (dup, or_replace) {
3232 (Some(_), false) => Err(StorageError::Corrupt(format!(
3233 "trigger {:?} already exists on table {:?}",
3234 def.name, def.table
3235 ))),
3236 (Some(i), true) => {
3237 self.triggers[i] = def;
3238 Ok(())
3239 }
3240 (None, _) => {
3241 self.triggers.push(def);
3242 Ok(())
3243 }
3244 }
3245 }
3246
3247 pub fn drop_trigger(&mut self, name: &str, table: &str) -> bool {
3250 let before = self.triggers.len();
3251 self.triggers
3252 .retain(|t| !(t.name == name && t.table == table));
3253 before != self.triggers.len()
3254 }
3255
3256 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
3257 if self.by_name.contains_key(&schema.name) {
3258 return Err(StorageError::DuplicateTable {
3259 name: schema.name.clone(),
3260 });
3261 }
3262 let idx = self.tables.len();
3263 let name = schema.name.clone();
3264 self.tables.push(Table::new(schema));
3265 self.by_name.insert(name, idx);
3266 Ok(())
3267 }
3268
3269 pub fn get(&self, name: &str) -> Option<&Table> {
3270 let idx = *self.by_name.get(name)?;
3271 self.tables.get(idx)
3272 }
3273
3274 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
3275 let idx = *self.by_name.get(name)?;
3276 self.tables.get_mut(idx)
3277 }
3278
3279 pub fn table_count(&self) -> usize {
3280 self.tables.len()
3281 }
3282
3283 pub fn drop_table(&mut self, name: &str) -> bool {
3289 let Some(idx) = self.by_name.remove(name) else {
3290 return false;
3291 };
3292 self.tables.swap_remove(idx);
3295 if idx < self.tables.len() {
3297 let moved_name = self.tables[idx].schema.name.clone();
3298 self.by_name.insert(moved_name, idx);
3299 }
3300 true
3301 }
3302
3303 pub fn drop_named_index(&mut self, name: &str) -> bool {
3306 for t in &mut self.tables {
3307 let before = t.indices.len();
3308 t.indices.retain(|i| i.name != name);
3309 if t.indices.len() != before {
3310 return true;
3311 }
3312 }
3313 false
3314 }
3315
3316 pub fn table_names(&self) -> Vec<String> {
3319 self.tables.iter().map(|t| t.schema.name.clone()).collect()
3320 }
3321
3322 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
3333 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
3334 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
3335 })?;
3336 let seg = OwnedSegment::from_bytes(bytes)
3337 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3338 self.cold_segments.push(Some(Arc::new(seg)));
3339 Ok(id)
3340 }
3341
3342 pub fn load_segment_bytes_at(
3355 &mut self,
3356 target_id: u32,
3357 bytes: Vec<u8>,
3358 ) -> Result<(), StorageError> {
3359 let seg = OwnedSegment::from_bytes(bytes)
3360 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
3361 let idx = target_id as usize;
3362 while self.cold_segments.len() <= idx {
3363 self.cold_segments.push(None);
3364 }
3365 if self.cold_segments[idx].is_some() {
3366 return Err(StorageError::Corrupt(format!(
3367 "load_segment_bytes_at: segment_id {target_id} already occupied"
3368 )));
3369 }
3370 self.cold_segments[idx] = Some(Arc::new(seg));
3371 Ok(())
3372 }
3373
3374 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
3384 let idx = segment_id as usize;
3385 if idx >= self.cold_segments.len() {
3386 return Err(StorageError::Corrupt(format!(
3387 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
3388 self.cold_segments.len()
3389 )));
3390 }
3391 self.cold_segments[idx] = None;
3392 Ok(())
3393 }
3394
3395 #[must_use]
3397 pub fn cold_segment_count(&self) -> usize {
3398 self.cold_segments.iter().filter(|s| s.is_some()).count()
3399 }
3400
3401 #[must_use]
3404 pub fn cold_segment_slot_count(&self) -> usize {
3405 self.cold_segments.len()
3406 }
3407
3408 #[must_use]
3413 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
3414 self.cold_segments
3415 .iter()
3416 .enumerate()
3417 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
3418 .collect()
3419 }
3420
3421 #[must_use]
3428 pub fn hot_tier_bytes(&self) -> u64 {
3429 self.tables
3430 .iter()
3431 .map(Table::hot_bytes)
3432 .fold(0u64, u64::saturating_add)
3433 }
3434
3435 pub fn freeze_oldest_to_cold(
3480 &mut self,
3481 table_name: &str,
3482 index_name: &str,
3483 max_rows: usize,
3484 ) -> Result<FreezeReport, StorageError> {
3485 if max_rows == 0 {
3487 return Err(StorageError::Corrupt(
3488 "freeze_oldest_to_cold: max_rows must be > 0".into(),
3489 ));
3490 }
3491 let table = self.get(table_name).ok_or_else(|| {
3492 StorageError::Corrupt(format!(
3493 "freeze_oldest_to_cold: table {table_name:?} not found"
3494 ))
3495 })?;
3496 if max_rows > table.rows.len() {
3497 return Err(StorageError::Corrupt(format!(
3498 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
3499 table.rows.len()
3500 )));
3501 }
3502 let idx = table
3503 .indices
3504 .iter()
3505 .find(|i| i.name == index_name)
3506 .ok_or_else(|| {
3507 StorageError::Corrupt(format!(
3508 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
3509 ))
3510 })?;
3511 if !matches!(idx.kind, IndexKind::BTree(_)) {
3512 return Err(StorageError::Corrupt(format!(
3513 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
3514 )));
3515 }
3516 let column_position = idx.column_position;
3517
3518 let schema = table.schema.clone();
3520 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
3521 for row_idx in 0..max_rows {
3522 let row = table.rows.get(row_idx).expect("bounds-checked above");
3523 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3524 StorageError::Corrupt(format!(
3525 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
3526 ))
3527 })?;
3528 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3529 StorageError::Corrupt(format!(
3530 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
3531 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3532 ))
3533 })?;
3534 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
3535 }
3536 to_freeze.sort_by_key(|(k, _, _)| *k);
3541 for w in to_freeze.windows(2) {
3545 if w[0].0 == w[1].0 {
3546 return Err(StorageError::Corrupt(format!(
3547 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
3548 w[0].0
3549 )));
3550 }
3551 }
3552 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
3556 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
3560 .into_iter()
3561 .map(|(k, body, _)| (k, body))
3562 .collect();
3563 let frozen_rows = seg_rows.len();
3564 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3565 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
3566
3567 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3576 let positions: Vec<usize> = (0..max_rows).collect();
3577 let t_mut = self
3578 .get_mut(table_name)
3579 .expect("just validated; still present");
3580 let removed = t_mut.delete_rows(&positions);
3581 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3582 let bytes_after = t_mut.hot_bytes();
3583 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3584
3585 let segment_id = self
3586 .load_segment_bytes(seg_bytes.clone())
3587 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3588 let new_cold = post_swap_keys.into_iter().map(|k| {
3589 (
3590 k,
3591 RowLocator::Cold {
3592 segment_id,
3593 page_offset: 0,
3594 },
3595 )
3596 });
3597 let t_mut = self.get_mut(table_name).expect("still present");
3598 t_mut.register_cold_locators(index_name, new_cold)?;
3599
3600 Ok(FreezeReport {
3601 segment_id,
3602 frozen_rows,
3603 bytes_freed,
3604 segment_bytes: seg_bytes,
3605 })
3606 }
3607
3608 #[must_use]
3614 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3615 self.cold_segments
3616 .get(segment_id as usize)
3617 .and_then(|s| s.as_deref())
3618 }
3619
3620 pub fn resolve_cold_locator(
3629 &self,
3630 table_name: &str,
3631 segment_id: u32,
3632 key: &IndexKey,
3633 ) -> Option<Row> {
3634 let t = self.get(table_name)?;
3635 let u64_key = index_key_as_u64(key)?;
3636 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3637 let payload = seg.lookup(u64_key)?;
3638 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3639 Some(row)
3640 }
3641
3642 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3660 let t = self.get(table)?;
3661 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3662 let locators = idx.lookup_eq(key);
3663 let cold_u64_key = index_key_as_u64(key);
3664 for loc in locators {
3665 match *loc {
3666 RowLocator::Hot(i) => {
3667 if let Some(row) = t.rows.get(i) {
3668 return Some(row.clone());
3669 }
3670 }
3671 RowLocator::Cold {
3672 segment_id,
3673 page_offset: _,
3674 } => {
3675 let Some(u64_key) = cold_u64_key else {
3676 continue;
3679 };
3680 let Some(seg) = self
3681 .cold_segments
3682 .get(segment_id as usize)
3683 .and_then(|s| s.as_deref())
3684 else {
3685 continue;
3696 };
3697 let Some(payload) = seg.lookup(u64_key) else {
3698 continue;
3699 };
3700 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3701 return Some(row);
3702 }
3703 }
3704 }
3705 None
3706 }
3707
3708 pub fn promote_cold_row(
3730 &mut self,
3731 table_name: &str,
3732 index_name: &str,
3733 key: &IndexKey,
3734 ) -> Result<Option<usize>, StorageError> {
3735 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3736 let Some((segment_id, _page_offset)) = cold_loc else {
3737 return Ok(None);
3738 };
3739 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3740 StorageError::Corrupt(
3741 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3742 .into(),
3743 )
3744 })?;
3745 let schema = self
3749 .get(table_name)
3750 .ok_or_else(|| {
3751 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3752 })?
3753 .schema
3754 .clone();
3755 let seg = self
3756 .cold_segments
3757 .get(segment_id as usize)
3758 .and_then(|s| s.as_ref())
3759 .ok_or_else(|| {
3760 StorageError::Corrupt(format!(
3761 "promote_cold_row: segment {segment_id} not registered on catalog"
3762 ))
3763 })?;
3764 let payload = seg.lookup(u64_key).ok_or_else(|| {
3765 StorageError::Corrupt(format!(
3766 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3767 but the segment's bloom/page lookup didn't return a row"
3768 ))
3769 })?;
3770 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3771 let t = self
3776 .get_mut(table_name)
3777 .expect("table existed at lookup time");
3778 t.insert(row)?;
3779 let new_hot_idx =
3780 t.rows.len().checked_sub(1).ok_or_else(|| {
3781 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3782 })?;
3783 t.remove_cold_locators_for_key(index_name, key)?;
3787 Ok(Some(new_hot_idx))
3788 }
3789
3790 pub fn shadow_cold_row(
3808 &mut self,
3809 table_name: &str,
3810 index_name: &str,
3811 key: &IndexKey,
3812 ) -> Result<usize, StorageError> {
3813 let t = self.get_mut(table_name).ok_or_else(|| {
3814 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3815 })?;
3816 t.remove_cold_locators_for_key(index_name, key)
3817 }
3818
3819 pub fn prepare_freeze_slice(
3837 &self,
3838 table_name: &str,
3839 index_name: &str,
3840 row_range: core::ops::Range<usize>,
3841 ) -> Result<FreezeSlice, StorageError> {
3842 let table = self.get(table_name).ok_or_else(|| {
3843 StorageError::Corrupt(format!(
3844 "prepare_freeze_slice: table {table_name:?} not found"
3845 ))
3846 })?;
3847 let idx = table
3848 .indices
3849 .iter()
3850 .find(|i| i.name == index_name)
3851 .ok_or_else(|| {
3852 StorageError::Corrupt(format!(
3853 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3854 ))
3855 })?;
3856 if !matches!(idx.kind, IndexKind::BTree(_)) {
3857 return Err(StorageError::Corrupt(format!(
3858 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3859 )));
3860 }
3861 if row_range.end > table.rows.len() {
3862 return Err(StorageError::Corrupt(format!(
3863 "prepare_freeze_slice: row_range end {} > row_count {}",
3864 row_range.end,
3865 table.rows.len()
3866 )));
3867 }
3868 let column_position = idx.column_position;
3869 let schema = table.schema.clone();
3870 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3871 for row_idx in row_range.clone() {
3872 let row = table.rows.get(row_idx).expect("bounds-checked above");
3873 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3874 StorageError::Corrupt(format!(
3875 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3876 ))
3877 })?;
3878 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3879 StorageError::Corrupt(format!(
3880 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3881 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3882 ))
3883 })?;
3884 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3885 }
3886 rows.sort_by_key(|(k, _, _)| *k);
3887 Ok(FreezeSlice { row_range, rows })
3888 }
3889
3890 pub fn commit_freeze_slices(
3904 &mut self,
3905 table_name: &str,
3906 index_name: &str,
3907 slices: Vec<FreezeSlice>,
3908 ) -> Result<FreezeReport, StorageError> {
3909 let table = self.get(table_name).ok_or_else(|| {
3911 StorageError::Corrupt(format!(
3912 "commit_freeze_slices: table {table_name:?} not found"
3913 ))
3914 })?;
3915 let idx = table
3916 .indices
3917 .iter()
3918 .find(|i| i.name == index_name)
3919 .ok_or_else(|| {
3920 StorageError::Corrupt(format!(
3921 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3922 ))
3923 })?;
3924 if !matches!(idx.kind, IndexKind::BTree(_)) {
3925 return Err(StorageError::Corrupt(format!(
3926 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3927 )));
3928 }
3929 let mut ordered = slices;
3933 ordered.sort_by_key(|s| s.row_range.start);
3934 let mut expected_start = 0usize;
3938 for s in &ordered {
3939 if s.row_range.start != expected_start {
3940 return Err(StorageError::Corrupt(format!(
3941 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3942 s.row_range.start, expected_start
3943 )));
3944 }
3945 expected_start = s.row_range.end;
3946 }
3947 let max_rows = expected_start;
3948 if max_rows > table.rows.len() {
3949 return Err(StorageError::Corrupt(format!(
3950 "commit_freeze_slices: total row range {} exceeds row_count {}",
3951 max_rows,
3952 table.rows.len()
3953 )));
3954 }
3955 if max_rows == 0 {
3956 return Ok(FreezeReport {
3957 segment_id: u32::MAX,
3958 frozen_rows: 0,
3959 bytes_freed: 0,
3960 segment_bytes: Vec::new(),
3961 });
3962 }
3963
3964 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3969 if total_rows != max_rows {
3970 return Err(StorageError::Corrupt(format!(
3971 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3972 )));
3973 }
3974 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3975 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3976 loop {
3977 let mut pick: Option<usize> = None;
3980 for (i, c) in cursors.iter().enumerate() {
3981 let slice = &ordered[i];
3982 if *c >= slice.rows.len() {
3983 continue;
3984 }
3985 match pick {
3986 None => pick = Some(i),
3987 Some(j) => {
3988 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3989 pick = Some(i);
3990 }
3991 }
3992 }
3993 }
3994 let Some(i) = pick else { break };
3995 let row = ordered[i].rows[cursors[i]].clone();
3996 cursors[i] += 1;
3997 merged.push(row);
3998 }
3999 for w in merged.windows(2) {
4002 if w[0].0 == w[1].0 {
4003 return Err(StorageError::Corrupt(format!(
4004 "commit_freeze_slices: duplicate PK {} across slices",
4005 w[0].0
4006 )));
4007 }
4008 }
4009 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
4010 let seg_rows: Vec<(u64, Vec<u8>)> =
4011 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
4012 let frozen_rows = seg_rows.len();
4013 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4014 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
4015
4016 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
4018 let positions: Vec<usize> = (0..max_rows).collect();
4019 let t_mut = self
4020 .get_mut(table_name)
4021 .expect("just validated; still present");
4022 let removed = t_mut.delete_rows(&positions);
4023 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
4024 let bytes_after = t_mut.hot_bytes();
4025 let bytes_freed = bytes_before.saturating_sub(bytes_after);
4026
4027 let segment_id = self
4028 .load_segment_bytes(seg_bytes.clone())
4029 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
4030 let new_cold = post_swap_keys.into_iter().map(|k| {
4031 (
4032 k,
4033 RowLocator::Cold {
4034 segment_id,
4035 page_offset: 0,
4036 },
4037 )
4038 });
4039 let t_mut = self.get_mut(table_name).expect("still present");
4040 t_mut.register_cold_locators(index_name, new_cold)?;
4041
4042 Ok(FreezeReport {
4043 segment_id,
4044 frozen_rows,
4045 bytes_freed,
4046 segment_bytes: seg_bytes,
4047 })
4048 }
4049
4050 pub fn compact_cold_segments(
4093 &mut self,
4094 table_name: &str,
4095 index_name: &str,
4096 target_segment_bytes: u64,
4097 ) -> Result<CompactReport, StorageError> {
4098 let t = self.get(table_name).ok_or_else(|| {
4100 StorageError::Corrupt(format!(
4101 "compact_cold_segments: table {table_name:?} not found"
4102 ))
4103 })?;
4104 let idx = t
4105 .indices
4106 .iter()
4107 .find(|i| i.name == index_name)
4108 .ok_or_else(|| {
4109 StorageError::Corrupt(format!(
4110 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
4111 ))
4112 })?;
4113 let map = match &idx.kind {
4114 IndexKind::BTree(m) => m,
4115 IndexKind::Nsw(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
4116 return Err(StorageError::Corrupt(format!(
4117 "compact_cold_segments: index {index_name:?} is not BTree; \
4118 compaction applies only to BTree cold-tier indices"
4119 )));
4120 }
4121 };
4122
4123 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
4126 for (_key, locators) in map.iter() {
4127 for loc in locators {
4128 if let RowLocator::Cold { segment_id, .. } = loc {
4129 referenced_ids.insert(*segment_id);
4130 }
4131 }
4132 }
4133 let candidate_set: BTreeSet<u32> = referenced_ids
4135 .into_iter()
4136 .filter(|id| {
4137 self.cold_segments
4138 .get(*id as usize)
4139 .and_then(|s| s.as_deref())
4140 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
4141 })
4142 .collect();
4143 if candidate_set.len() < 2 {
4144 return Ok(CompactReport {
4145 sources: Vec::new(),
4146 merged_segment_id: None,
4147 merged_segment_bytes: Vec::new(),
4148 merged_rows: 0,
4149 deleted_rows_pruned: 0,
4150 bytes_reclaimed_estimate: 0,
4151 });
4152 }
4153 let mut source_row_count: usize = 0;
4155 let mut source_byte_total: u64 = 0;
4156 for &id in &candidate_set {
4157 let seg = self.cold_segments[id as usize]
4158 .as_ref()
4159 .expect("candidate selected only when slot is Some");
4160 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
4161 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
4162 }
4163 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
4169 for (key, locators) in map.iter() {
4170 for loc in locators {
4171 let RowLocator::Cold { segment_id, .. } = loc else {
4172 continue;
4173 };
4174 if !candidate_set.contains(segment_id) {
4175 continue;
4176 }
4177 let u64_key = index_key_as_u64(key).ok_or_else(|| {
4178 StorageError::Corrupt(format!(
4179 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
4180 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
4181 ))
4182 })?;
4183 let seg = self.cold_segments[*segment_id as usize]
4184 .as_ref()
4185 .expect("candidate slot guaranteed Some above");
4186 let payload = seg.lookup(u64_key).ok_or_else(|| {
4187 StorageError::Corrupt(format!(
4188 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
4189 at segment {segment_id} but the segment lookup missed"
4190 ))
4191 })?;
4192 collected.insert(u64_key, (payload, key.clone()));
4193 break;
4194 }
4195 }
4196 let merged_rows = collected.len();
4197 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
4198
4199 let seg_rows: Vec<(u64, Vec<u8>)> = collected
4203 .iter()
4204 .map(|(k, (body, _))| (*k, body.clone()))
4205 .collect();
4206 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
4207 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
4208 let merged_bytes_len = seg_bytes.len() as u64;
4209
4210 let merged_segment_id = self
4212 .load_segment_bytes(seg_bytes.clone())
4213 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
4214
4215 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
4221 let t = self
4222 .get(table_name)
4223 .expect("table existed at the start of this fn");
4224 let idx = t
4225 .indices
4226 .iter()
4227 .find(|i| i.name == index_name)
4228 .expect("index existed at the start of this fn");
4229 let IndexKind::BTree(map) = &idx.kind else {
4230 unreachable!("validated above");
4231 };
4232 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
4233 };
4234 let t_mut = self
4235 .get_mut(table_name)
4236 .expect("table existed at the start of this fn");
4237 let idx_mut = t_mut
4238 .indices
4239 .iter_mut()
4240 .find(|i| i.name == index_name)
4241 .expect("index existed at the start of this fn");
4242 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
4243 unreachable!("validated above");
4244 };
4245 for (key, locators) in entries {
4246 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
4247 let mut changed = false;
4248 for loc in &locators {
4249 match *loc {
4250 RowLocator::Cold {
4251 segment_id,
4252 page_offset: _,
4253 } if candidate_set.contains(&segment_id) => {
4254 let replacement = RowLocator::Cold {
4255 segment_id: merged_segment_id,
4256 page_offset: 0,
4257 };
4258 if !new_locs.contains(&replacement) {
4259 new_locs.push(replacement);
4260 }
4261 changed = true;
4262 }
4263 other => new_locs.push(other),
4264 }
4265 }
4266 if changed {
4267 map_mut.insert_mut(key, new_locs);
4268 }
4269 }
4270
4271 for &id in &candidate_set {
4276 self.tombstone_segment(id)?;
4277 }
4278
4279 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
4280 Ok(CompactReport {
4281 sources: candidate_set.into_iter().collect(),
4282 merged_segment_id: Some(merged_segment_id),
4283 merged_segment_bytes: seg_bytes,
4284 merged_rows,
4285 deleted_rows_pruned,
4286 bytes_reclaimed_estimate,
4287 })
4288 }
4289
4290 fn find_cold_locator(
4296 &self,
4297 table_name: &str,
4298 index_name: &str,
4299 key: &IndexKey,
4300 ) -> Result<Option<(u32, u32)>, StorageError> {
4301 let t = self.get(table_name).ok_or_else(|| {
4302 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
4303 })?;
4304 let idx = t
4305 .indices
4306 .iter()
4307 .find(|i| i.name == index_name)
4308 .ok_or_else(|| {
4309 StorageError::Corrupt(format!(
4310 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
4311 ))
4312 })?;
4313 if !matches!(idx.kind, IndexKind::BTree(_)) {
4314 return Err(StorageError::Corrupt(format!(
4315 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
4316 )));
4317 }
4318 for loc in idx.lookup_eq(key) {
4319 if let RowLocator::Cold {
4320 segment_id,
4321 page_offset,
4322 } = *loc
4323 {
4324 return Ok(Some((segment_id, page_offset)));
4325 }
4326 }
4327 Ok(None)
4328 }
4329}
4330
4331fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
4337 match key {
4338 IndexKey::Int(n) => Some(n.cast_unsigned()),
4344 IndexKey::Text(_) | IndexKey::Bool(_) => None,
4345 }
4346}
4347
4348#[derive(Debug, Clone, PartialEq, Eq)]
4349#[non_exhaustive]
4350pub enum StorageError {
4351 DuplicateTable {
4352 name: String,
4353 },
4354 TableNotFound {
4355 name: String,
4356 },
4357 ArityMismatch {
4358 expected: usize,
4359 actual: usize,
4360 },
4361 TypeMismatch {
4362 column: String,
4363 expected: DataType,
4364 actual: DataType,
4365 position: usize,
4366 },
4367 NullInNotNull {
4368 column: String,
4369 },
4370 DuplicateIndex {
4372 name: String,
4373 },
4374 ColumnNotFound {
4376 column: String,
4377 },
4378 Corrupt(String),
4381 IndexNotFound {
4384 name: String,
4385 },
4386 Unsupported(String),
4390}
4391
4392impl fmt::Display for StorageError {
4393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4394 match self {
4395 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
4396 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
4397 Self::ArityMismatch { expected, actual } => write!(
4398 f,
4399 "row arity mismatch: expected {expected} columns, got {actual}"
4400 ),
4401 Self::TypeMismatch {
4402 column,
4403 expected,
4404 actual,
4405 position,
4406 } => write!(
4407 f,
4408 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
4409 ),
4410 Self::NullInNotNull { column } => {
4411 write!(f, "NULL value in NOT NULL column {column:?}")
4412 }
4413 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
4414 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
4415 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
4416 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
4417 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
4418 }
4419 }
4420}
4421
4422impl ColumnSchema {
4423 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
4424 Self {
4425 name: name.into(),
4426 ty,
4427 nullable,
4428 default: None,
4429 runtime_default: None,
4430 auto_increment: false,
4431 }
4432 }
4433
4434 #[must_use]
4438 pub fn with_default(mut self, default: Value) -> Self {
4439 self.default = Some(default);
4440 self
4441 }
4442
4443 #[must_use]
4448 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
4449 self.runtime_default = Some(expr.into());
4450 self
4451 }
4452
4453 #[must_use]
4455 pub const fn with_auto_increment(mut self) -> Self {
4456 self.auto_increment = true;
4457 self
4458 }
4459}
4460
4461impl TableSchema {
4462 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
4463 Self {
4464 name: name.into(),
4465 columns,
4466 hot_tier_bytes: None,
4467 foreign_keys: Vec::new(),
4468 uniqueness_constraints: Vec::new(),
4469 checks: Vec::new(),
4470 }
4471 }
4472}
4473
4474const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
4522const FILE_VERSION: u8 = 23;
4559const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
4562
4563const INDEX_KEY_TAG_INT: u8 = 0;
4568const INDEX_KEY_TAG_TEXT: u8 = 1;
4569const INDEX_KEY_TAG_BOOL: u8 = 2;
4570
4571impl Catalog {
4572 pub fn serialize(&self) -> Vec<u8> {
4575 let mut out = Vec::with_capacity(64);
4576 out.extend_from_slice(FILE_MAGIC);
4577 out.push(FILE_VERSION);
4578 write_u32(
4579 &mut out,
4580 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
4581 );
4582 for t in &self.tables {
4583 write_str(&mut out, &t.schema.name);
4584 write_u16(
4585 &mut out,
4586 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
4587 );
4588 for c in &t.schema.columns {
4589 write_str(&mut out, &c.name);
4590 write_data_type(&mut out, c.ty);
4591 out.push(u8::from(c.nullable));
4592 match &c.default {
4593 None => out.push(0),
4594 Some(v) => {
4595 out.push(1);
4596 write_value(&mut out, v);
4597 }
4598 }
4599 out.push(u8::from(c.auto_increment));
4600 }
4601 write_u32(
4602 &mut out,
4603 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4604 );
4605 for row in &t.rows {
4610 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4611 }
4612 write_u16(
4619 &mut out,
4620 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4621 );
4622 for idx in &t.indices {
4623 write_str(&mut out, &idx.name);
4624 write_u16(
4625 &mut out,
4626 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4627 );
4628 match &idx.kind {
4629 IndexKind::BTree(map) => {
4630 out.push(0);
4631 write_u32(
4639 &mut out,
4640 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4641 );
4642 for (key, locators) in map {
4643 write_index_key(&mut out, key);
4644 write_u32(
4645 &mut out,
4646 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4647 );
4648 for loc in locators {
4649 loc.write_le(&mut out);
4650 }
4651 }
4652 }
4653 IndexKind::Nsw(g) => {
4654 out.push(1);
4655 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4656 write_nsw_graph(&mut out, g);
4657 }
4658 IndexKind::Brin { column_type } => {
4659 out.push(2);
4665 write_data_type(&mut out, *column_type);
4666 }
4667 IndexKind::Gin(map) => {
4668 out.push(3);
4677 write_u32(
4678 &mut out,
4679 u32::try_from(map.len()).expect("≤ 4G GIN posting lists"),
4680 );
4681 for (word, locators) in map {
4682 write_str(&mut out, word);
4683 write_u32(
4684 &mut out,
4685 u32::try_from(locators.len()).expect("≤ 4G locators/posting list"),
4686 );
4687 for loc in locators {
4688 loc.write_le(&mut out);
4689 }
4690 }
4691 }
4692 }
4693 write_u16(
4699 &mut out,
4700 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4701 );
4702 for col_pos in &idx.included_columns {
4703 write_u16(
4704 &mut out,
4705 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4706 );
4707 }
4708 match &idx.partial_predicate {
4712 None => out.push(0),
4713 Some(pred) => {
4714 out.push(1);
4715 write_str(&mut out, pred);
4716 }
4717 }
4718 match &idx.expression {
4721 None => out.push(0),
4722 Some(expr) => {
4723 out.push(1);
4724 write_str(&mut out, expr);
4725 }
4726 }
4727 out.push(u8::from(idx.is_unique));
4731 write_u16(
4734 &mut out,
4735 u16::try_from(idx.extra_column_positions.len())
4736 .expect("≤ 65k extra cols / index"),
4737 );
4738 for cp in &idx.extra_column_positions {
4739 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4740 }
4741 }
4742 match t.schema.hot_tier_bytes {
4748 None => out.push(0),
4749 Some(n) => {
4750 out.push(1);
4751 out.extend_from_slice(&n.to_le_bytes());
4752 }
4753 }
4754 write_u16(
4765 &mut out,
4766 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4767 );
4768 for fk in &t.schema.foreign_keys {
4769 match &fk.name {
4770 None => out.push(0),
4771 Some(n) => {
4772 out.push(1);
4773 write_str(&mut out, n);
4774 }
4775 }
4776 write_u16(
4777 &mut out,
4778 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4779 );
4780 for &p in &fk.local_columns {
4781 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4782 }
4783 write_str(&mut out, &fk.parent_table);
4784 write_u16(
4785 &mut out,
4786 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4787 );
4788 for &p in &fk.parent_columns {
4789 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4790 }
4791 out.push(fk.on_delete.tag());
4792 out.push(fk.on_update.tag());
4793 }
4794 write_u16(
4803 &mut out,
4804 u16::try_from(t.schema.uniqueness_constraints.len())
4805 .expect("≤ 65k uniqueness constraints/table"),
4806 );
4807 for uc in &t.schema.uniqueness_constraints {
4808 out.push(u8::from(uc.is_primary_key));
4809 write_u16(
4810 &mut out,
4811 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4812 );
4813 for &p in &uc.columns {
4814 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4815 }
4816 out.push(u8::from(uc.nulls_not_distinct));
4821 }
4822 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4829 for (i, c) in t.schema.columns.iter().enumerate() {
4830 if let Some(e) = &c.runtime_default {
4831 rt_defaults.push((i, e.as_str()));
4832 }
4833 }
4834 write_u16(
4835 &mut out,
4836 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4837 );
4838 for (pos, expr) in rt_defaults {
4839 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4840 write_str(&mut out, expr);
4841 }
4842 write_u16(
4849 &mut out,
4850 u16::try_from(t.schema.checks.len()).expect("≤ 65k CHECK constraints/table"),
4851 );
4852 for c in &t.schema.checks {
4853 write_str(&mut out, c.as_str());
4854 }
4855 }
4856 write_u32(
4869 &mut out,
4870 u32::try_from(self.functions.len()).expect("≤ 4G functions"),
4871 );
4872 for fd in self.functions.values() {
4873 write_str(&mut out, &fd.name);
4874 write_str(&mut out, &fd.args_repr);
4875 write_str(&mut out, &fd.returns);
4876 write_str(&mut out, &fd.language);
4877 write_str_long(&mut out, &fd.body);
4878 }
4879 write_u32(
4880 &mut out,
4881 u32::try_from(self.triggers.len()).expect("≤ 4G triggers"),
4882 );
4883 for td in &self.triggers {
4884 write_str(&mut out, &td.name);
4885 write_str(&mut out, &td.table);
4886 write_str(&mut out, &td.timing);
4887 write_u16(
4888 &mut out,
4889 u16::try_from(td.events.len()).expect("≤ 65k events / trigger"),
4890 );
4891 for ev in &td.events {
4892 write_str(&mut out, ev);
4893 }
4894 write_str(&mut out, &td.for_each);
4895 write_str(&mut out, &td.function);
4896 write_u16(
4900 &mut out,
4901 u16::try_from(td.update_columns.len()).expect("≤ 65k cols / trigger"),
4902 );
4903 for c in &td.update_columns {
4904 write_str(&mut out, c);
4905 }
4906 }
4907 out
4908 }
4909
4910 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4913 let mut cur = Cursor::new(buf);
4914 let magic = cur.take(8)?;
4915 if magic != FILE_MAGIC {
4916 return Err(StorageError::Corrupt(format!(
4917 "bad magic: expected SPGDB001, got {magic:?}"
4918 )));
4919 }
4920 let version = cur.read_u8()?;
4921 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4922 return Err(StorageError::Corrupt(format!(
4923 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4924 )));
4925 }
4926 let table_count = cur.read_u32()? as usize;
4927 let mut cat = Self::new();
4928 for _ in 0..table_count {
4929 deserialize_table(&mut cur, &mut cat, version)?;
4930 }
4931 if version >= 22 {
4935 let fn_count = cur.read_u32()? as usize;
4936 for _ in 0..fn_count {
4937 let name = cur.read_str()?;
4938 let args_repr = cur.read_str()?;
4939 let returns = cur.read_str()?;
4940 let language = cur.read_str()?;
4941 let body = cur.read_str_long()?;
4942 cat.functions.insert(
4943 name.clone(),
4944 FunctionDef {
4945 name,
4946 args_repr,
4947 returns,
4948 language,
4949 body,
4950 },
4951 );
4952 }
4953 let trg_count = cur.read_u32()? as usize;
4954 for _ in 0..trg_count {
4955 let name = cur.read_str()?;
4956 let table = cur.read_str()?;
4957 let timing = cur.read_str()?;
4958 let ev_count = cur.read_u16()? as usize;
4959 let mut events = Vec::with_capacity(ev_count);
4960 for _ in 0..ev_count {
4961 events.push(cur.read_str()?);
4962 }
4963 let for_each = cur.read_str()?;
4964 let function = cur.read_str()?;
4965 let update_columns = if version >= 23 {
4969 let n = cur.read_u16()? as usize;
4970 let mut cols = Vec::with_capacity(n);
4971 for _ in 0..n {
4972 cols.push(cur.read_str()?);
4973 }
4974 cols
4975 } else {
4976 Vec::new()
4977 };
4978 cat.triggers.push(TriggerDef {
4979 name,
4980 table,
4981 timing,
4982 events,
4983 for_each,
4984 function,
4985 update_columns,
4986 });
4987 }
4988 }
4989 if cur.pos < buf.len() {
4990 return Err(StorageError::Corrupt(format!(
4991 "trailing bytes: {} unread",
4992 buf.len() - cur.pos
4993 )));
4994 }
4995 Ok(cat)
4996 }
4997}
4998
4999fn deserialize_table(
5004 cur: &mut Cursor<'_>,
5005 cat: &mut Catalog,
5006 version: u8,
5007) -> Result<(), StorageError> {
5008 let table_name = cur.read_str()?;
5009 let name = table_name.clone();
5010 let col_count = cur.read_u16()? as usize;
5011 let mut cols = Vec::with_capacity(col_count);
5012 for _ in 0..col_count {
5013 let c_name = cur.read_str()?;
5014 let ty = cur.read_data_type()?;
5015 let nullable = cur.read_u8()? != 0;
5016 let default = match cur.read_u8()? {
5017 0 => None,
5018 1 => Some(cur.read_value()?),
5019 other => {
5020 return Err(StorageError::Corrupt(format!(
5021 "unknown default tag: {other}"
5022 )));
5023 }
5024 };
5025 let auto_increment = cur.read_u8()? != 0;
5026 cols.push(ColumnSchema {
5030 name: c_name,
5031 ty,
5032 nullable,
5033 default,
5034 runtime_default: None,
5035 auto_increment,
5036 });
5037 }
5038 let n_cols = cols.len();
5039 cat.create_table(TableSchema::new(name, cols))?;
5040 let t = cat.tables.last_mut().expect("create_table just pushed");
5044 deserialize_rows(cur, t, n_cols)?;
5045 deserialize_indices(cur, t, version)?;
5046 if version >= 11 {
5052 let has = cur.read_u8()?;
5053 let hot_tier_bytes = match has {
5054 0 => None,
5055 1 => Some(cur.read_u64()?),
5056 other => {
5057 return Err(StorageError::Corrupt(format!(
5058 "hot_tier_bytes appendix: unknown has-value byte {other}"
5059 )));
5060 }
5061 };
5062 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
5063 }
5064 if version >= 13 {
5067 let fk_count = cur.read_u16()? as usize;
5068 let mut fks = Vec::with_capacity(fk_count);
5069 for _ in 0..fk_count {
5070 let name = match cur.read_u8()? {
5071 0 => None,
5072 1 => Some(cur.read_str()?),
5073 other => {
5074 return Err(StorageError::Corrupt(format!(
5075 "FK appendix: unknown has-name byte {other}"
5076 )));
5077 }
5078 };
5079 let local_arity = cur.read_u16()? as usize;
5080 let mut local_columns = Vec::with_capacity(local_arity);
5081 for _ in 0..local_arity {
5082 local_columns.push(cur.read_u16()? as usize);
5083 }
5084 let parent_table = cur.read_str()?;
5085 let parent_arity = cur.read_u16()? as usize;
5086 if parent_arity != local_arity {
5087 return Err(StorageError::Corrupt(format!(
5088 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
5089 )));
5090 }
5091 let mut parent_columns = Vec::with_capacity(parent_arity);
5092 for _ in 0..parent_arity {
5093 parent_columns.push(cur.read_u16()? as usize);
5094 }
5095 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5096 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
5097 })?;
5098 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
5099 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
5100 })?;
5101 fks.push(ForeignKeyConstraint {
5102 name,
5103 local_columns,
5104 parent_table,
5105 parent_columns,
5106 on_delete,
5107 on_update,
5108 });
5109 }
5110 t.schema_mut().foreign_keys = fks;
5111 }
5112 if version >= 15 {
5115 let uc_count = cur.read_u16()? as usize;
5116 let mut ucs = Vec::with_capacity(uc_count);
5117 for _ in 0..uc_count {
5118 let is_pk = cur.read_u8()? != 0;
5119 let arity = cur.read_u16()? as usize;
5120 let mut cols = Vec::with_capacity(arity);
5121 for _ in 0..arity {
5122 cols.push(cur.read_u16()? as usize);
5123 }
5124 let nulls_not_distinct = if version >= 23 {
5128 cur.read_u8()? != 0
5129 } else {
5130 false
5131 };
5132 ucs.push(UniquenessConstraint {
5133 is_primary_key: is_pk,
5134 columns: cols,
5135 nulls_not_distinct,
5136 });
5137 }
5138 t.schema_mut().uniqueness_constraints = ucs;
5139 let rt_count = cur.read_u16()? as usize;
5141 for _ in 0..rt_count {
5142 let pos = cur.read_u16()? as usize;
5143 let expr = cur.read_str()?;
5144 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
5145 col.runtime_default = Some(expr);
5146 }
5147 }
5148 }
5149 if version >= 23 {
5152 let check_count = cur.read_u16()? as usize;
5153 let mut checks = Vec::with_capacity(check_count);
5154 for _ in 0..check_count {
5155 checks.push(cur.read_str()?);
5156 }
5157 t.schema_mut().checks = checks;
5158 }
5159 let _ = table_name;
5160 Ok(())
5161}
5162
5163fn deserialize_rows(
5164 cur: &mut Cursor<'_>,
5165 t: &mut Table,
5166 _n_cols: usize,
5167) -> Result<(), StorageError> {
5168 let row_count = cur.read_u32()? as usize;
5169 let mut hot_bytes: u64 = 0;
5174 for _ in 0..row_count {
5175 let tail = &cur.buf[cur.pos..];
5176 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
5177 cur.pos += consumed;
5178 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
5184 t.rows.push_mut(row);
5185 }
5186 t.hot_bytes = hot_bytes;
5187 Ok(())
5188}
5189
5190fn deserialize_indices(
5191 cur: &mut Cursor<'_>,
5192 t: &mut Table,
5193 version: u8,
5194) -> Result<(), StorageError> {
5195 let index_count = cur.read_u16()? as usize;
5196 for _ in 0..index_count {
5197 let idx_name = cur.read_str()?;
5198 let col_pos = cur.read_u16()? as usize;
5199 let column_name = t
5200 .schema
5201 .columns
5202 .get(col_pos)
5203 .ok_or_else(|| {
5204 StorageError::Corrupt(format!(
5205 "index {idx_name:?} points at non-existent column position {col_pos}"
5206 ))
5207 })?
5208 .name
5209 .clone();
5210 let kind_tag = cur.read_u8()?;
5211 match kind_tag {
5212 0 => {
5213 if version >= 9 {
5214 let map = read_btree_map(cur)?;
5219 t.restore_btree_index(idx_name, &column_name, map)?;
5220 } else {
5221 t.add_index(idx_name, &column_name)?;
5226 }
5227 }
5228 1 => {
5229 let m = cur.read_u16()? as usize;
5230 let graph = cur.read_nsw_graph(m)?;
5231 t.restore_nsw_index(idx_name, &column_name, graph)?;
5232 }
5233 2 => {
5234 let column_type = cur.read_data_type()?;
5238 t.restore_brin_index(idx_name, &column_name, column_type)?;
5239 }
5240 3 => {
5241 let map = read_gin_map(cur)?;
5246 t.restore_gin_index(idx_name, &column_name, map)?;
5247 }
5248 other => {
5249 return Err(StorageError::Corrupt(format!(
5250 "unknown index kind tag: {other}"
5251 )));
5252 }
5253 }
5254 if version >= 12 {
5257 let num_included = cur.read_u16()? as usize;
5258 if num_included > 0 {
5259 let mut included: Vec<usize> = Vec::with_capacity(num_included);
5260 for _ in 0..num_included {
5261 let cp = cur.read_u16()? as usize;
5262 if cp >= t.schema.columns.len() {
5263 return Err(StorageError::Corrupt(format!(
5264 "INCLUDE column position {cp} out of range \
5265 ({} schema columns)",
5266 t.schema.columns.len()
5267 )));
5268 }
5269 included.push(cp);
5270 }
5271 if let Some(last) = t.indices.last_mut() {
5272 last.included_columns = included;
5273 }
5274 }
5275 match cur.read_u8()? {
5277 0 => {}
5278 1 => {
5279 let pred = cur.read_str()?;
5280 if let Some(last) = t.indices.last_mut() {
5281 last.partial_predicate = Some(pred);
5282 }
5283 }
5284 other => {
5285 return Err(StorageError::Corrupt(format!(
5286 "partial_predicate tag: unknown byte {other}"
5287 )));
5288 }
5289 }
5290 match cur.read_u8()? {
5292 0 => {}
5293 1 => {
5294 let expr = cur.read_str()?;
5295 if let Some(last) = t.indices.last_mut() {
5296 last.expression = Some(expr);
5297 }
5298 }
5299 other => {
5300 return Err(StorageError::Corrupt(format!(
5301 "expression tag: unknown byte {other}"
5302 )));
5303 }
5304 }
5305 if version >= 16 {
5308 match cur.read_u8()? {
5309 0 => {}
5310 1 => {
5311 if let Some(last) = t.indices.last_mut() {
5312 last.is_unique = true;
5313 }
5314 }
5315 other => {
5316 return Err(StorageError::Corrupt(format!(
5317 "is_unique tag: unknown byte {other}"
5318 )));
5319 }
5320 }
5321 let n = cur.read_u16()? as usize;
5323 if n > 0 {
5324 let mut extras: Vec<usize> = Vec::with_capacity(n);
5325 for _ in 0..n {
5326 let cp = cur.read_u16()? as usize;
5327 if cp >= t.schema.columns.len() {
5328 return Err(StorageError::Corrupt(format!(
5329 "extra column position {cp} out of range \
5330 ({} schema columns)",
5331 t.schema.columns.len()
5332 )));
5333 }
5334 extras.push(cp);
5335 }
5336 if let Some(last) = t.indices.last_mut() {
5337 last.extra_column_positions = extras;
5338 }
5339 }
5340 }
5341 }
5342 }
5343 Ok(())
5344}
5345
5346fn read_btree_map(
5350 cur: &mut Cursor<'_>,
5351) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
5352 let entry_count = cur.read_u32()? as usize;
5353 let mut map = PersistentBTreeMap::new();
5354 for _ in 0..entry_count {
5355 let key = cur.read_index_key()?;
5356 let locator_count = cur.read_u32()? as usize;
5357 let mut locators = Vec::with_capacity(locator_count);
5358 for _ in 0..locator_count {
5359 let tail = &cur.buf[cur.pos..];
5360 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5361 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5362 })?;
5363 cur.pos += consumed;
5364 locators.push(loc);
5365 }
5366 map.insert_mut(key, locators);
5367 }
5368 Ok(map)
5369}
5370
5371fn read_gin_map(
5375 cur: &mut Cursor<'_>,
5376) -> Result<PersistentBTreeMap<String, Vec<RowLocator>>, StorageError> {
5377 let entry_count = cur.read_u32()? as usize;
5378 let mut map = PersistentBTreeMap::new();
5379 for _ in 0..entry_count {
5380 let word = cur.read_str()?;
5381 let locator_count = cur.read_u32()? as usize;
5382 let mut locators = Vec::with_capacity(locator_count);
5383 for _ in 0..locator_count {
5384 let tail = &cur.buf[cur.pos..];
5385 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
5386 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
5387 })?;
5388 cur.pos += consumed;
5389 locators.push(loc);
5390 }
5391 map.insert_mut(word, locators);
5392 }
5393 Ok(map)
5394}
5395
5396fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
5412 let entry = g.entry.map_or(u32::MAX, |e| {
5413 u32::try_from(e).expect("NSW entry fits in u32")
5414 });
5415 write_u16(
5416 out,
5417 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
5418 );
5419 out.extend_from_slice(&entry.to_le_bytes());
5420 out.push(g.entry_level);
5421 let node_count = g.levels.len();
5422 write_u32(
5423 out,
5424 u32::try_from(node_count).expect("HNSW node count fits in u32"),
5425 );
5426 for &lvl in &g.levels {
5427 out.push(lvl);
5428 }
5429 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
5430 out.push(layer_count);
5431 for layer in &g.layers {
5432 write_u32(
5433 out,
5434 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
5435 );
5436 for neighbors in layer {
5437 write_u16(
5438 out,
5439 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
5440 );
5441 for &peer in neighbors {
5445 write_u32(out, peer);
5446 }
5447 }
5448 }
5449}
5450
5451fn write_data_type(out: &mut Vec<u8>, t: DataType) {
5452 match t {
5453 DataType::Int => out.push(1),
5454 DataType::BigInt => out.push(2),
5455 DataType::Float => out.push(3),
5456 DataType::Text => out.push(4),
5457 DataType::Bool => out.push(5),
5458 DataType::Vector { dim, encoding } => match encoding {
5459 VecEncoding::F32 => {
5463 out.push(6);
5464 out.extend_from_slice(&dim.to_le_bytes());
5465 }
5466 VecEncoding::F16 => {
5469 out.push(15);
5470 out.extend_from_slice(&dim.to_le_bytes());
5471 }
5472 VecEncoding::Sq8 => {
5478 out.push(14);
5479 out.extend_from_slice(&dim.to_le_bytes());
5480 }
5481 },
5482 DataType::SmallInt => out.push(7),
5483 DataType::Varchar(max) => {
5484 out.push(8);
5485 out.extend_from_slice(&max.to_le_bytes());
5486 }
5487 DataType::Char(size) => {
5488 out.push(9);
5489 out.extend_from_slice(&size.to_le_bytes());
5490 }
5491 DataType::Numeric { precision, scale } => {
5492 out.push(10);
5493 out.push(precision);
5494 out.push(scale);
5495 }
5496 DataType::Date => out.push(11),
5497 DataType::Timestamp => out.push(12),
5498 DataType::Timestamptz => out.push(17),
5502 DataType::Interval => {
5507 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
5508 }
5509 DataType::Json => out.push(13),
5510 DataType::Jsonb => out.push(16),
5513 DataType::Bytes => out.push(18),
5515 DataType::TextArray => out.push(19),
5518 DataType::IntArray => out.push(20),
5521 DataType::BigIntArray => out.push(21),
5524 DataType::TsVector => out.push(22),
5527 DataType::TsQuery => out.push(23),
5530 }
5531}
5532
5533impl Cursor<'_> {
5534 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
5535 let tag = self.read_u8()?;
5536 match tag {
5537 1 => Ok(DataType::Int),
5538 2 => Ok(DataType::BigInt),
5539 3 => Ok(DataType::Float),
5540 4 => Ok(DataType::Text),
5541 5 => Ok(DataType::Bool),
5542 6 => Ok(DataType::Vector {
5543 dim: self.read_u32()?,
5544 encoding: VecEncoding::F32,
5545 }),
5546 7 => Ok(DataType::SmallInt),
5547 8 => Ok(DataType::Varchar(self.read_u32()?)),
5548 9 => Ok(DataType::Char(self.read_u32()?)),
5549 10 => {
5550 let precision = self.read_u8()?;
5551 let scale = self.read_u8()?;
5552 Ok(DataType::Numeric { precision, scale })
5553 }
5554 11 => Ok(DataType::Date),
5555 12 => Ok(DataType::Timestamp),
5556 13 => Ok(DataType::Json),
5557 14 => Ok(DataType::Vector {
5558 dim: self.read_u32()?,
5559 encoding: VecEncoding::Sq8,
5560 }),
5561 15 => Ok(DataType::Vector {
5565 dim: self.read_u32()?,
5566 encoding: VecEncoding::F16,
5567 }),
5568 16 => Ok(DataType::Jsonb),
5572 17 => Ok(DataType::Timestamptz),
5576 18 => Ok(DataType::Bytes),
5578 19 => Ok(DataType::TextArray),
5580 20 => Ok(DataType::IntArray),
5582 21 => Ok(DataType::BigIntArray),
5583 22 => Ok(DataType::TsVector),
5586 23 => Ok(DataType::TsQuery),
5587 other => Err(StorageError::Corrupt(format!(
5588 "unknown data type tag: {other}"
5589 ))),
5590 }
5591 }
5592}
5593
5594pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
5600 debug_assert_eq!(
5601 row.values.len(),
5602 schema.columns.len(),
5603 "row_body_encoded_len: row arity must match schema"
5604 );
5605 let bitmap_bytes = schema.columns.len().div_ceil(8);
5606 let mut n = bitmap_bytes;
5607 for (col_idx, v) in row.values.iter().enumerate() {
5608 if matches!(v, Value::Null) {
5609 continue;
5610 }
5611 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
5612 }
5613 n
5614}
5615
5616fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
5622 match v {
5623 Value::SmallInt(_) => 2,
5624 Value::Int(_) | Value::Date(_) => 4,
5626 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
5628 Value::Bool(_) => 1,
5629 Value::Text(s) | Value::Json(s) => 2 + s.len(),
5631 Value::Vector(vec) => 4 + 4 * vec.len(),
5633 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
5640 Value::HalfVector(h) => 4 + h.bytes.len(),
5643 Value::Numeric { .. } => 16 + 1,
5645 Value::Bytes(b) => 2 + b.len(),
5651 Value::TextArray(items) => {
5654 let mut n = 2; for item in items {
5656 n += 1; if let Some(s) = item {
5658 n += 2 + s.len();
5659 }
5660 }
5661 n
5662 }
5663 Value::IntArray(items) => {
5666 2 + items
5667 .iter()
5668 .map(|x| if x.is_some() { 5 } else { 1 })
5669 .sum::<usize>()
5670 }
5671 Value::BigIntArray(items) => {
5672 2 + items
5673 .iter()
5674 .map(|x| if x.is_some() { 9 } else { 1 })
5675 .sum::<usize>()
5676 }
5677 Value::TsVector(lexs) => {
5681 let mut n = 2;
5682 for l in lexs {
5683 n += 2 + l.word.len() + 2 + 2 * l.positions.len() + 1;
5684 }
5685 n
5686 }
5687 Value::TsQuery(ast) => tsquery_encoded_len(ast),
5690 Value::Null => 0,
5692 Value::Interval { .. } => {
5694 unreachable!("Value::Interval has no on-disk encoding")
5695 }
5696 }
5697}
5698
5699pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
5710 debug_assert_eq!(
5711 row.values.len(),
5712 schema.columns.len(),
5713 "dense encode: row arity must match schema"
5714 );
5715 let bitmap_bytes = schema.columns.len().div_ceil(8);
5716 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
5719 let bitmap_offset = out.len();
5720 out.resize(bitmap_offset + bitmap_bytes, 0);
5721 for (i, v) in row.values.iter().enumerate() {
5722 if matches!(v, Value::Null) {
5723 out[bitmap_offset + i / 8] |= 1 << (i % 8);
5724 }
5725 }
5726 for (col_idx, v) in row.values.iter().enumerate() {
5727 if matches!(v, Value::Null) {
5728 continue;
5729 }
5730 write_value_body(&mut out, v, schema.columns[col_idx].ty);
5731 }
5732 out
5733}
5734
5735pub fn decode_row_body_dense(
5741 bytes: &[u8],
5742 schema: &TableSchema,
5743) -> Result<(Row, usize), StorageError> {
5744 let mut cur = Cursor::new(bytes);
5745 let bitmap_bytes = schema.columns.len().div_ceil(8);
5746 let mut bitmap_buf = [0u8; 32];
5747 if bitmap_bytes > bitmap_buf.len() {
5748 return Err(StorageError::Corrupt(format!(
5749 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
5750 )));
5751 }
5752 let slice = cur.take(bitmap_bytes)?;
5753 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
5754 let mut values = Vec::with_capacity(schema.columns.len());
5755 for (col_idx, col) in schema.columns.iter().enumerate() {
5756 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
5757 values.push(Value::Null);
5758 } else {
5759 values.push(cur.read_value_body(col.ty)?);
5760 }
5761 }
5762 Ok((Row { values }, cur.pos))
5763}
5764
5765fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
5774 match (v, ty) {
5775 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
5776 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
5777 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
5778 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
5779 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
5780 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
5781 write_str(out, s);
5782 }
5783 (
5784 Value::Vector(v),
5785 DataType::Vector {
5786 encoding: VecEncoding::F32,
5787 ..
5788 },
5789 ) => {
5790 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5791 out.extend_from_slice(&dim.to_le_bytes());
5792 for x in v {
5793 out.extend_from_slice(&x.to_le_bytes());
5794 }
5795 }
5796 (
5802 Value::Sq8Vector(q),
5803 DataType::Vector {
5804 encoding: VecEncoding::Sq8,
5805 ..
5806 },
5807 ) => {
5808 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5809 out.extend_from_slice(&dim.to_le_bytes());
5810 out.extend_from_slice(&q.min.to_le_bytes());
5811 out.extend_from_slice(&q.max.to_le_bytes());
5812 out.extend_from_slice(&q.bytes);
5813 }
5814 (
5818 Value::HalfVector(h),
5819 DataType::Vector {
5820 encoding: VecEncoding::F16,
5821 ..
5822 },
5823 ) => {
5824 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5825 out.extend_from_slice(&dim.to_le_bytes());
5826 out.extend_from_slice(&h.bytes);
5827 }
5828 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
5829 out.extend_from_slice(&scaled.to_le_bytes());
5830 out.push(scale);
5831 }
5832 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
5833 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
5834 out.extend_from_slice(&t.to_le_bytes())
5835 }
5836 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5840 (Value::Bytes(b), DataType::Bytes) => {
5843 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5844 out.extend_from_slice(&len.to_le_bytes());
5845 out.extend_from_slice(b);
5846 }
5847 (Value::TextArray(items), DataType::TextArray) => {
5850 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5851 out.extend_from_slice(&count.to_le_bytes());
5852 for item in items {
5853 match item {
5854 None => out.push(1),
5855 Some(s) => {
5856 out.push(0);
5857 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5858 out.extend_from_slice(&len.to_le_bytes());
5859 out.extend_from_slice(s.as_bytes());
5860 }
5861 }
5862 }
5863 }
5864 (Value::IntArray(items), DataType::IntArray) => {
5867 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5868 out.extend_from_slice(&count.to_le_bytes());
5869 for item in items {
5870 match item {
5871 None => out.push(1),
5872 Some(n) => {
5873 out.push(0);
5874 out.extend_from_slice(&n.to_le_bytes());
5875 }
5876 }
5877 }
5878 }
5879 (Value::BigIntArray(items), DataType::BigIntArray) => {
5882 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5883 out.extend_from_slice(&count.to_le_bytes());
5884 for item in items {
5885 match item {
5886 None => out.push(1),
5887 Some(n) => {
5888 out.push(0);
5889 out.extend_from_slice(&n.to_le_bytes());
5890 }
5891 }
5892 }
5893 }
5894 (Value::TsVector(lexs), DataType::TsVector) => write_tsvector_body(out, lexs),
5897 (Value::TsQuery(ast), DataType::TsQuery) => write_tsquery_body(out, ast),
5899 (other, ty) => unreachable!(
5903 "schema-driven encode received mismatched value/type pair: \
5904 value tag={:?}, column type={:?}",
5905 other.data_type(),
5906 ty
5907 ),
5908 }
5909}
5910
5911fn write_value(out: &mut Vec<u8>, v: &Value) {
5912 match v {
5913 Value::Null => out.push(0),
5914 Value::SmallInt(n) => {
5915 out.push(7);
5916 out.extend_from_slice(&n.to_le_bytes());
5917 }
5918 Value::Int(n) => {
5919 out.push(1);
5920 out.extend_from_slice(&n.to_le_bytes());
5921 }
5922 Value::BigInt(n) => {
5923 out.push(2);
5924 out.extend_from_slice(&n.to_le_bytes());
5925 }
5926 Value::Float(x) => {
5927 out.push(3);
5928 out.extend_from_slice(&x.to_le_bytes());
5929 }
5930 Value::Text(s) | Value::Json(s) => {
5935 out.push(4);
5936 write_str(out, s);
5937 }
5938 Value::Bool(b) => {
5939 out.push(5);
5940 out.push(u8::from(*b));
5941 }
5942 Value::Vector(v) => {
5943 out.push(6);
5944 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5945 out.extend_from_slice(&dim.to_le_bytes());
5946 for x in v {
5947 out.extend_from_slice(&x.to_le_bytes());
5948 }
5949 }
5950 Value::Sq8Vector(q) => {
5955 out.push(11);
5956 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5957 out.extend_from_slice(&dim.to_le_bytes());
5958 out.extend_from_slice(&q.min.to_le_bytes());
5959 out.extend_from_slice(&q.max.to_le_bytes());
5960 out.extend_from_slice(&q.bytes);
5961 }
5962 Value::HalfVector(h) => {
5967 out.push(12);
5968 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5969 out.extend_from_slice(&dim.to_le_bytes());
5970 out.extend_from_slice(&h.bytes);
5971 }
5972 Value::Numeric { scaled, scale } => {
5973 out.push(8);
5974 out.extend_from_slice(&scaled.to_le_bytes());
5975 out.push(*scale);
5976 }
5977 Value::Date(d) => {
5978 out.push(9);
5979 out.extend_from_slice(&d.to_le_bytes());
5980 }
5981 Value::Timestamp(t) => {
5982 out.push(10);
5983 out.extend_from_slice(&t.to_le_bytes());
5984 }
5985 Value::Interval { .. } => {
5989 unreachable!(
5990 "Value::Interval has no on-disk encoding; engine must reject it before write"
5991 )
5992 }
5993 Value::Bytes(b) => {
5998 out.push(14);
5999 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
6000 out.extend_from_slice(&len.to_le_bytes());
6001 out.extend_from_slice(b);
6002 }
6003 Value::TextArray(items) => {
6006 out.push(15);
6007 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
6008 out.extend_from_slice(&count.to_le_bytes());
6009 for item in items {
6010 match item {
6011 None => out.push(1),
6012 Some(s) => {
6013 out.push(0);
6014 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
6015 out.extend_from_slice(&len.to_le_bytes());
6016 out.extend_from_slice(s.as_bytes());
6017 }
6018 }
6019 }
6020 }
6021 Value::IntArray(items) => {
6024 out.push(16);
6025 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
6026 out.extend_from_slice(&count.to_le_bytes());
6027 for item in items {
6028 match item {
6029 None => out.push(1),
6030 Some(n) => {
6031 out.push(0);
6032 out.extend_from_slice(&n.to_le_bytes());
6033 }
6034 }
6035 }
6036 }
6037 Value::BigIntArray(items) => {
6040 out.push(17);
6041 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
6042 out.extend_from_slice(&count.to_le_bytes());
6043 for item in items {
6044 match item {
6045 None => out.push(1),
6046 Some(n) => {
6047 out.push(0);
6048 out.extend_from_slice(&n.to_le_bytes());
6049 }
6050 }
6051 }
6052 }
6053 Value::TsVector(lexs) => {
6056 out.push(18);
6057 write_tsvector_body(out, lexs);
6058 }
6059 Value::TsQuery(ast) => {
6062 out.push(19);
6063 write_tsquery_body(out, ast);
6064 }
6065 }
6066}
6067
6068fn write_tsvector_body(out: &mut Vec<u8>, lexs: &[TsLexeme]) {
6071 let count = u16::try_from(lexs.len()).expect("tsvector ≤ 65k lexemes");
6072 out.extend_from_slice(&count.to_le_bytes());
6073 for l in lexs {
6074 let wlen = u16::try_from(l.word.len()).expect("tsvector word ≤ 64 KiB");
6075 out.extend_from_slice(&wlen.to_le_bytes());
6076 out.extend_from_slice(l.word.as_bytes());
6077 let plen = u16::try_from(l.positions.len()).expect("tsvector pos count ≤ 65k");
6078 out.extend_from_slice(&plen.to_le_bytes());
6079 for p in &l.positions {
6080 out.extend_from_slice(&p.to_le_bytes());
6081 }
6082 out.push(l.weight);
6083 }
6084}
6085
6086fn write_tsquery_body(out: &mut Vec<u8>, ast: &TsQueryAst) {
6090 match ast {
6091 TsQueryAst::Term { word, weight_mask } => {
6092 out.push(0);
6093 let len = u16::try_from(word.len()).expect("tsquery term ≤ 64 KiB");
6094 out.extend_from_slice(&len.to_le_bytes());
6095 out.extend_from_slice(word.as_bytes());
6096 out.push(*weight_mask);
6097 }
6098 TsQueryAst::And(a, b) => {
6099 out.push(1);
6100 write_tsquery_body(out, a);
6101 write_tsquery_body(out, b);
6102 }
6103 TsQueryAst::Or(a, b) => {
6104 out.push(2);
6105 write_tsquery_body(out, a);
6106 write_tsquery_body(out, b);
6107 }
6108 TsQueryAst::Not(x) => {
6109 out.push(3);
6110 write_tsquery_body(out, x);
6111 }
6112 TsQueryAst::Phrase {
6113 left,
6114 right,
6115 distance,
6116 } => {
6117 out.push(4);
6118 out.extend_from_slice(&distance.to_le_bytes());
6119 write_tsquery_body(out, left);
6120 write_tsquery_body(out, right);
6121 }
6122 }
6123}
6124
6125fn tsquery_encoded_len(ast: &TsQueryAst) -> usize {
6127 match ast {
6128 TsQueryAst::Term { word, .. } => 1 + 2 + word.len() + 1,
6129 TsQueryAst::And(a, b) | TsQueryAst::Or(a, b) => {
6130 1 + tsquery_encoded_len(a) + tsquery_encoded_len(b)
6131 }
6132 TsQueryAst::Not(x) => 1 + tsquery_encoded_len(x),
6133 TsQueryAst::Phrase { left, right, .. } => {
6134 1 + 2 + tsquery_encoded_len(left) + tsquery_encoded_len(right)
6135 }
6136 }
6137}
6138
6139fn write_u16(out: &mut Vec<u8>, n: u16) {
6140 out.extend_from_slice(&n.to_le_bytes());
6141}
6142fn write_u32(out: &mut Vec<u8>, n: u32) {
6143 out.extend_from_slice(&n.to_le_bytes());
6144}
6145fn write_str(out: &mut Vec<u8>, s: &str) {
6146 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
6147 write_u16(out, len);
6148 out.extend_from_slice(s.as_bytes());
6149}
6150
6151fn write_str_long(out: &mut Vec<u8>, s: &str) {
6156 let len = u32::try_from(s.len()).expect("function body fits in u32");
6157 write_u32(out, len);
6158 out.extend_from_slice(s.as_bytes());
6159}
6160
6161fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
6165 match key {
6166 IndexKey::Int(n) => {
6167 out.push(INDEX_KEY_TAG_INT);
6168 out.extend_from_slice(&n.to_le_bytes());
6169 }
6170 IndexKey::Text(s) => {
6171 out.push(INDEX_KEY_TAG_TEXT);
6172 write_str(out, s);
6173 }
6174 IndexKey::Bool(b) => {
6175 out.push(INDEX_KEY_TAG_BOOL);
6176 out.push(u8::from(*b));
6177 }
6178 }
6179}
6180
6181struct Cursor<'a> {
6182 buf: &'a [u8],
6183 pos: usize,
6184}
6185
6186impl<'a> Cursor<'a> {
6187 const fn new(buf: &'a [u8]) -> Self {
6188 Self { buf, pos: 0 }
6189 }
6190
6191 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
6192 let end = self
6193 .pos
6194 .checked_add(n)
6195 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
6196 if end > self.buf.len() {
6197 return Err(StorageError::Corrupt(format!(
6198 "unexpected EOF at offset {} (wanted {n} more bytes)",
6199 self.pos
6200 )));
6201 }
6202 let s = &self.buf[self.pos..end];
6203 self.pos = end;
6204 Ok(s)
6205 }
6206
6207 fn read_u8(&mut self) -> Result<u8, StorageError> {
6208 Ok(self.take(1)?[0])
6209 }
6210 fn read_u16(&mut self) -> Result<u16, StorageError> {
6211 let s = self.take(2)?;
6212 Ok(u16::from_le_bytes([s[0], s[1]]))
6213 }
6214 fn read_u32(&mut self) -> Result<u32, StorageError> {
6215 let s = self.take(4)?;
6216 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6217 }
6218 fn read_i32(&mut self) -> Result<i32, StorageError> {
6219 let s = self.take(4)?;
6220 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6221 }
6222 fn read_u64(&mut self) -> Result<u64, StorageError> {
6225 let s = self.take(8)?;
6226 Ok(u64::from_le_bytes([
6227 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
6228 ]))
6229 }
6230 fn read_i64(&mut self) -> Result<i64, StorageError> {
6231 let s = self.take(8)?;
6232 let arr: [u8; 8] = s.try_into().expect("checked");
6233 Ok(i64::from_le_bytes(arr))
6234 }
6235 fn read_f64(&mut self) -> Result<f64, StorageError> {
6236 let s = self.take(8)?;
6237 let arr: [u8; 8] = s.try_into().expect("checked");
6238 Ok(f64::from_le_bytes(arr))
6239 }
6240 fn read_f32(&mut self) -> Result<f32, StorageError> {
6241 let s = self.take(4)?;
6242 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
6243 }
6244 fn read_str(&mut self) -> Result<String, StorageError> {
6245 let len = self.read_u16()? as usize;
6246 let bytes = self.take(len)?;
6247 core::str::from_utf8(bytes)
6248 .map(String::from)
6249 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
6250 }
6251
6252 fn read_str_long(&mut self) -> Result<String, StorageError> {
6256 let len = self.read_u32()? as usize;
6257 let bytes = self.take(len)?;
6258 core::str::from_utf8(bytes)
6259 .map(String::from)
6260 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in long-string payload".into()))
6261 }
6262
6263 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
6267 let tag = self.read_u8()?;
6268 match tag {
6269 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
6270 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
6271 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
6272 other => Err(StorageError::Corrupt(format!(
6273 "unknown index key tag: {other}"
6274 ))),
6275 }
6276 }
6277 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
6283 match ty {
6284 DataType::SmallInt => {
6285 let s = self.take(2)?;
6286 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6287 }
6288 DataType::Int => Ok(Value::Int(self.read_i32()?)),
6289 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
6290 DataType::Float => Ok(Value::Float(self.read_f64()?)),
6291 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
6292 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
6293 Ok(Value::Text(self.read_str()?))
6294 }
6295 DataType::Vector {
6296 encoding: VecEncoding::F32,
6297 ..
6298 } => {
6299 let dim = self.read_u32()? as usize;
6300 let mut v = Vec::with_capacity(dim);
6301 for _ in 0..dim {
6302 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6303 v.push(f32::from_le_bytes(bytes));
6304 }
6305 Ok(Value::Vector(v))
6306 }
6307 DataType::Vector {
6308 encoding: VecEncoding::Sq8,
6309 ..
6310 } => {
6311 let dim = self.read_u32()? as usize;
6312 let min = self.read_f32()?;
6313 let max = self.read_f32()?;
6314 let bytes = self.take(dim)?.to_vec();
6315 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6316 }
6317 DataType::Vector {
6318 encoding: VecEncoding::F16,
6319 ..
6320 } => {
6321 let dim = self.read_u32()? as usize;
6322 let bytes = self.take(dim * 2)?.to_vec();
6323 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6324 }
6325 DataType::Numeric { .. } => {
6326 let s = self.take(16)?;
6327 let arr: [u8; 16] = s.try_into().expect("checked");
6328 let scaled = i128::from_le_bytes(arr);
6329 let scale = self.read_u8()?;
6330 Ok(Value::Numeric { scaled, scale })
6331 }
6332 DataType::Date => Ok(Value::Date(self.read_i32()?)),
6333 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
6334 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
6335 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
6336 DataType::Interval => {
6337 Err(StorageError::Corrupt(
6342 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
6343 ))
6344 }
6345 DataType::Json => Ok(Value::Json(self.read_str()?)),
6346 DataType::Bytes => {
6349 let len = self.read_u16()? as usize;
6350 let bytes = self.take(len)?.to_vec();
6351 Ok(Value::Bytes(bytes))
6352 }
6353 DataType::TextArray => {
6355 let count = self.read_u16()? as usize;
6356 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6357 for _ in 0..count {
6358 match self.read_u8()? {
6359 0 => items.push(Some(self.read_str()?)),
6360 1 => items.push(None),
6361 other => {
6362 return Err(StorageError::Corrupt(format!(
6363 "TEXT[] null flag: unknown byte {other}"
6364 )));
6365 }
6366 }
6367 }
6368 Ok(Value::TextArray(items))
6369 }
6370 DataType::IntArray => {
6372 let count = self.read_u16()? as usize;
6373 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6374 for _ in 0..count {
6375 match self.read_u8()? {
6376 0 => items.push(Some(self.read_i32()?)),
6377 1 => items.push(None),
6378 other => {
6379 return Err(StorageError::Corrupt(format!(
6380 "INT[] null flag: unknown byte {other}"
6381 )));
6382 }
6383 }
6384 }
6385 Ok(Value::IntArray(items))
6386 }
6387 DataType::BigIntArray => {
6389 let count = self.read_u16()? as usize;
6390 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6391 for _ in 0..count {
6392 match self.read_u8()? {
6393 0 => items.push(Some(self.read_i64()?)),
6394 1 => items.push(None),
6395 other => {
6396 return Err(StorageError::Corrupt(format!(
6397 "BIGINT[] null flag: unknown byte {other}"
6398 )));
6399 }
6400 }
6401 }
6402 Ok(Value::BigIntArray(items))
6403 }
6404 DataType::TsVector => Ok(Value::TsVector(self.read_tsvector_body()?)),
6408 DataType::TsQuery => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6409 }
6410 }
6411
6412 fn read_tsvector_body(&mut self) -> Result<Vec<TsLexeme>, StorageError> {
6414 let count = self.read_u16()? as usize;
6415 let mut out = Vec::with_capacity(count);
6416 for _ in 0..count {
6417 let word = self.read_str()?;
6418 let pos_count = self.read_u16()? as usize;
6419 let mut positions = Vec::with_capacity(pos_count);
6420 for _ in 0..pos_count {
6421 positions.push(self.read_u16()?);
6422 }
6423 let weight = self.read_u8()?;
6424 out.push(TsLexeme {
6425 word,
6426 positions,
6427 weight,
6428 });
6429 }
6430 Ok(out)
6431 }
6432
6433 fn read_tsquery_body(&mut self) -> Result<TsQueryAst, StorageError> {
6435 let tag = self.read_u8()?;
6436 match tag {
6437 0 => {
6438 let word = self.read_str()?;
6439 let weight_mask = self.read_u8()?;
6440 Ok(TsQueryAst::Term { word, weight_mask })
6441 }
6442 1 => {
6443 let a = self.read_tsquery_body()?;
6444 let b = self.read_tsquery_body()?;
6445 Ok(TsQueryAst::And(Box::new(a), Box::new(b)))
6446 }
6447 2 => {
6448 let a = self.read_tsquery_body()?;
6449 let b = self.read_tsquery_body()?;
6450 Ok(TsQueryAst::Or(Box::new(a), Box::new(b)))
6451 }
6452 3 => {
6453 let x = self.read_tsquery_body()?;
6454 Ok(TsQueryAst::Not(Box::new(x)))
6455 }
6456 4 => {
6457 let distance = self.read_u16()?;
6458 let left = self.read_tsquery_body()?;
6459 let right = self.read_tsquery_body()?;
6460 Ok(TsQueryAst::Phrase {
6461 left: Box::new(left),
6462 right: Box::new(right),
6463 distance,
6464 })
6465 }
6466 other => Err(StorageError::Corrupt(format!(
6467 "tsquery: unknown node tag {other}"
6468 ))),
6469 }
6470 }
6471
6472 fn read_value(&mut self) -> Result<Value, StorageError> {
6473 let tag = self.read_u8()?;
6474 match tag {
6475 0 => Ok(Value::Null),
6476 1 => Ok(Value::Int(self.read_i32()?)),
6477 2 => Ok(Value::BigInt(self.read_i64()?)),
6478 3 => Ok(Value::Float(self.read_f64()?)),
6479 4 => Ok(Value::Text(self.read_str()?)),
6480 5 => Ok(Value::Bool(self.read_u8()? != 0)),
6481 6 => {
6482 let dim = self.read_u32()? as usize;
6483 let mut v = Vec::with_capacity(dim);
6484 for _ in 0..dim {
6485 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
6486 v.push(f32::from_le_bytes(bytes));
6487 }
6488 Ok(Value::Vector(v))
6489 }
6490 7 => {
6491 let s = self.take(2)?;
6492 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
6493 }
6494 8 => {
6495 let s = self.take(16)?;
6496 let arr: [u8; 16] = s.try_into().expect("checked");
6497 let scaled = i128::from_le_bytes(arr);
6498 let scale = self.read_u8()?;
6499 Ok(Value::Numeric { scaled, scale })
6500 }
6501 9 => Ok(Value::Date(self.read_i32()?)),
6502 10 => Ok(Value::Timestamp(self.read_i64()?)),
6503 11 => {
6508 let dim = self.read_u32()? as usize;
6509 let min = self.read_f32()?;
6510 let max = self.read_f32()?;
6511 let bytes = self.take(dim)?.to_vec();
6512 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
6513 }
6514 12 => {
6517 let dim = self.read_u32()? as usize;
6518 let bytes = self.take(dim * 2)?.to_vec();
6519 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
6520 }
6521 14 => {
6523 let len = self.read_u16()? as usize;
6524 let bytes = self.take(len)?.to_vec();
6525 Ok(Value::Bytes(bytes))
6526 }
6527 15 => {
6530 let count = self.read_u16()? as usize;
6531 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
6532 for _ in 0..count {
6533 match self.read_u8()? {
6534 0 => items.push(Some(self.read_str()?)),
6535 1 => items.push(None),
6536 other => {
6537 return Err(StorageError::Corrupt(format!(
6538 "TEXT[] null flag in value tag: unknown byte {other}"
6539 )));
6540 }
6541 }
6542 }
6543 Ok(Value::TextArray(items))
6544 }
6545 16 => {
6547 let count = self.read_u16()? as usize;
6548 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
6549 for _ in 0..count {
6550 match self.read_u8()? {
6551 0 => items.push(Some(self.read_i32()?)),
6552 1 => items.push(None),
6553 other => {
6554 return Err(StorageError::Corrupt(format!(
6555 "INT[] null flag in value tag: unknown byte {other}"
6556 )));
6557 }
6558 }
6559 }
6560 Ok(Value::IntArray(items))
6561 }
6562 17 => {
6563 let count = self.read_u16()? as usize;
6564 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
6565 for _ in 0..count {
6566 match self.read_u8()? {
6567 0 => items.push(Some(self.read_i64()?)),
6568 1 => items.push(None),
6569 other => {
6570 return Err(StorageError::Corrupt(format!(
6571 "BIGINT[] null flag in value tag: unknown byte {other}"
6572 )));
6573 }
6574 }
6575 }
6576 Ok(Value::BigIntArray(items))
6577 }
6578 18 => Ok(Value::TsVector(self.read_tsvector_body()?)),
6581 19 => Ok(Value::TsQuery(self.read_tsquery_body()?)),
6583 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
6584 }
6585 }
6586
6587 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
6591 let m_max_0 = self.read_u16()? as usize;
6592 let entry_raw = self.read_u32()?;
6593 let entry = if entry_raw == u32::MAX {
6594 None
6595 } else {
6596 Some(entry_raw as usize)
6597 };
6598 let entry_level = self.read_u8()?;
6599 let node_count = self.read_u32()? as usize;
6600 let mut levels: PersistentVec<u8> = PersistentVec::new();
6605 for _ in 0..node_count {
6606 levels.push_mut(self.read_u8()?);
6607 }
6608 let layer_count = self.read_u8()? as usize;
6609 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
6610 for _ in 0..layer_count {
6611 let n = self.read_u32()? as usize;
6612 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
6613 for _ in 0..n {
6614 let cnt = self.read_u16()? as usize;
6615 let mut row: Vec<u32> = Vec::with_capacity(cnt);
6616 for _ in 0..cnt {
6617 row.push(self.read_u32()?);
6618 }
6619 per_layer.push_mut(row);
6620 }
6621 layers.push(per_layer);
6622 }
6623 Ok(NswGraph {
6624 m,
6625 m_max_0,
6626 entry,
6627 entry_level,
6628 levels,
6629 layers,
6630 })
6631 }
6632}
6633
6634#[cfg(test)]
6635mod tests {
6636 use super::*;
6637 use alloc::string::ToString;
6638 use alloc::vec;
6639
6640 #[cfg(target_arch = "aarch64")]
6641 #[test]
6642 fn neon_l2_matches_scalar() {
6643 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
6648 for &d in &dims {
6649 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6650 let mut a = Vec::with_capacity(d);
6651 let mut b = Vec::with_capacity(d);
6652 for _ in 0..d {
6653 state = state
6654 .wrapping_mul(6_364_136_223_846_793_005)
6655 .wrapping_add(1);
6656 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6657 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6658 state = state
6659 .wrapping_mul(6_364_136_223_846_793_005)
6660 .wrapping_add(1);
6661 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6662 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6663 a.push(x);
6664 b.push(y);
6665 }
6666 let scalar = l2_distance_sq_scalar(&a, &b);
6667 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
6668 let tol = (scalar.abs().max(1e-6)) * 1e-4;
6669 assert!(
6670 (scalar - neon).abs() <= tol,
6671 "dim={d}: scalar={scalar} neon={neon} diff={}",
6672 (scalar - neon).abs()
6673 );
6674 }
6675 }
6676
6677 #[cfg(target_arch = "aarch64")]
6678 #[test]
6679 fn neon_inner_product_matches_scalar() {
6680 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6684 for &d in &dims {
6685 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
6686 let mut a = Vec::with_capacity(d);
6687 let mut b = Vec::with_capacity(d);
6688 for _ in 0..d {
6689 state = state
6690 .wrapping_mul(6_364_136_223_846_793_005)
6691 .wrapping_add(1);
6692 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6693 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6694 state = state
6695 .wrapping_mul(6_364_136_223_846_793_005)
6696 .wrapping_add(1);
6697 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6698 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6699 a.push(x);
6700 b.push(y);
6701 }
6702 let scalar = inner_product_scalar(&a, &b);
6703 let neon = unsafe { inner_product_neon(&a, &b) };
6704 #[allow(clippy::cast_precision_loss)]
6705 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6706 assert!(
6707 (scalar - neon).abs() <= tol,
6708 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
6709 (scalar - neon).abs()
6710 );
6711 }
6712 }
6713
6714 #[cfg(target_arch = "aarch64")]
6715 #[allow(clippy::similar_names)]
6716 #[test]
6717 fn neon_cosine_dot_norms_matches_scalar() {
6718 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
6719 for &d in &dims {
6720 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
6721 let mut a = Vec::with_capacity(d);
6722 let mut b = Vec::with_capacity(d);
6723 for _ in 0..d {
6724 state = state
6725 .wrapping_mul(6_364_136_223_846_793_005)
6726 .wrapping_add(1);
6727 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6728 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6729 state = state
6730 .wrapping_mul(6_364_136_223_846_793_005)
6731 .wrapping_add(1);
6732 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
6733 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
6734 a.push(x);
6735 b.push(y);
6736 }
6737 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
6738 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
6739 #[allow(clippy::cast_precision_loss)]
6740 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6741 #[allow(clippy::cast_precision_loss)]
6742 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
6743 assert!(
6744 (dot_s - dot_n).abs() <= tol_d,
6745 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
6746 );
6747 assert!(
6748 (na_s - na_n).abs() <= tol_n,
6749 "cosine na dim={d}: scalar={na_s} neon={na_n}"
6750 );
6751 assert!(
6752 (nb_s - nb_n).abs() <= tol_n,
6753 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
6754 );
6755 }
6756 }
6757
6758 fn make_users_schema() -> TableSchema {
6759 TableSchema::new(
6760 "users",
6761 vec![
6762 ColumnSchema::new("id", DataType::Int, false),
6763 ColumnSchema::new("name", DataType::Text, false),
6764 ColumnSchema::new("score", DataType::Float, true),
6765 ],
6766 )
6767 }
6768
6769 #[test]
6770 fn value_type_tag_matches_variant() {
6771 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
6772 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
6773 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
6774 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
6775 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
6776 assert_eq!(Value::Null.data_type(), None);
6777 assert!(Value::Null.is_null());
6778 assert!(!Value::Int(0).is_null());
6779 }
6780
6781 #[test]
6782 fn sq8_value_reports_sq8_data_type() {
6783 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
6788 let v = Value::Sq8Vector(q);
6789 assert_eq!(
6790 v.data_type(),
6791 Some(DataType::Vector {
6792 dim: 5,
6793 encoding: VecEncoding::Sq8,
6794 }),
6795 );
6796 }
6797
6798 #[test]
6799 fn datatype_display_matches_pg_keyword() {
6800 assert_eq!(DataType::Int.to_string(), "INT");
6801 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
6802 assert_eq!(DataType::Float.to_string(), "FLOAT");
6803 assert_eq!(DataType::Text.to_string(), "TEXT");
6804 assert_eq!(DataType::Bool.to_string(), "BOOL");
6805 }
6806
6807 #[test]
6808 fn row_len_and_emptiness() {
6809 let r = Row::new(vec![Value::Int(1), Value::Null]);
6810 assert_eq!(r.len(), 2);
6811 assert!(!r.is_empty());
6812 assert!(Row::new(Vec::new()).is_empty());
6813 }
6814
6815 #[test]
6816 fn table_schema_column_position() {
6817 let s = make_users_schema();
6818 assert_eq!(s.column_position("id"), Some(0));
6819 assert_eq!(s.column_position("score"), Some(2));
6820 assert_eq!(s.column_position("missing"), None);
6821 }
6822
6823 #[test]
6824 fn catalog_create_table_then_lookup() {
6825 let mut cat = Catalog::new();
6826 cat.create_table(make_users_schema()).unwrap();
6827 assert_eq!(cat.table_count(), 1);
6828 assert!(cat.get("users").is_some());
6829 assert!(cat.get("nope").is_none());
6830 }
6831
6832 #[test]
6833 fn catalog_duplicate_table_is_rejected() {
6834 let mut cat = Catalog::new();
6835 cat.create_table(make_users_schema()).unwrap();
6836 let err = cat.create_table(make_users_schema()).unwrap_err();
6837 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
6838 }
6839
6840 #[test]
6841 fn table_insert_happy_path_appends_row() {
6842 let mut cat = Catalog::new();
6843 cat.create_table(make_users_schema()).unwrap();
6844 let t = cat.get_mut("users").unwrap();
6845 t.insert(Row::new(vec![
6846 Value::Int(1),
6847 Value::Text("alice".into()),
6848 Value::Float(99.5),
6849 ]))
6850 .unwrap();
6851 assert_eq!(t.row_count(), 1);
6852 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
6853 }
6854
6855 #[test]
6856 fn table_insert_arity_mismatch() {
6857 let mut cat = Catalog::new();
6858 cat.create_table(make_users_schema()).unwrap();
6859 let t = cat.get_mut("users").unwrap();
6860 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
6861 assert!(matches!(
6862 err,
6863 StorageError::ArityMismatch {
6864 expected: 3,
6865 actual: 1
6866 }
6867 ));
6868 assert_eq!(t.row_count(), 0);
6869 }
6870
6871 #[test]
6872 fn table_insert_type_mismatch_reports_column() {
6873 let mut cat = Catalog::new();
6874 cat.create_table(make_users_schema()).unwrap();
6875 let t = cat.get_mut("users").unwrap();
6876 let err = t
6877 .insert(Row::new(vec![
6878 Value::Int(1),
6879 Value::Int(42), Value::Float(0.0),
6881 ]))
6882 .unwrap_err();
6883 match err {
6884 StorageError::TypeMismatch {
6885 ref column,
6886 expected,
6887 actual,
6888 position,
6889 } => {
6890 assert_eq!(column, "name");
6891 assert_eq!(expected, DataType::Text);
6892 assert_eq!(actual, DataType::Int);
6893 assert_eq!(position, 1);
6894 }
6895 other => panic!("unexpected: {other:?}"),
6896 }
6897 assert_eq!(t.row_count(), 0);
6898 }
6899
6900 #[test]
6901 fn table_insert_null_into_not_null_rejected() {
6902 let mut cat = Catalog::new();
6903 cat.create_table(make_users_schema()).unwrap();
6904 let t = cat.get_mut("users").unwrap();
6905 let err = t
6906 .insert(Row::new(vec![
6907 Value::Int(1),
6908 Value::Null, Value::Float(1.0),
6910 ]))
6911 .unwrap_err();
6912 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
6913 }
6914
6915 #[test]
6916 fn table_insert_null_into_nullable_ok() {
6917 let mut cat = Catalog::new();
6918 cat.create_table(make_users_schema()).unwrap();
6919 let t = cat.get_mut("users").unwrap();
6920 t.insert(Row::new(vec![
6921 Value::Int(1),
6922 Value::Text("bob".into()),
6923 Value::Null,
6924 ]))
6925 .unwrap();
6926 assert_eq!(t.row_count(), 1);
6927 }
6928
6929 #[test]
6930 fn catalog_get_mut_independent_per_table() {
6931 let mut cat = Catalog::new();
6932 cat.create_table(TableSchema::new(
6933 "a",
6934 vec![ColumnSchema::new("v", DataType::Int, false)],
6935 ))
6936 .unwrap();
6937 cat.create_table(TableSchema::new(
6938 "b",
6939 vec![ColumnSchema::new("v", DataType::Int, false)],
6940 ))
6941 .unwrap();
6942 cat.get_mut("a")
6943 .unwrap()
6944 .insert(Row::new(vec![Value::Int(1)]))
6945 .unwrap();
6946 assert_eq!(cat.get("a").unwrap().row_count(), 1);
6947 assert_eq!(cat.get("b").unwrap().row_count(), 0);
6948 }
6949
6950 fn assert_round_trip(cat: &Catalog) {
6953 let bytes = cat.serialize();
6954 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6955 assert_eq!(restored.table_count(), cat.table_count());
6958 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
6959 assert_eq!(a.schema, b.schema);
6960 assert_eq!(a.rows, b.rows);
6961 }
6962 }
6963
6964 #[test]
6965 fn serialize_empty_catalog_round_trips() {
6966 assert_round_trip(&Catalog::new());
6967 }
6968
6969 #[test]
6970 fn serialize_single_empty_table_round_trips() {
6971 let mut cat = Catalog::new();
6972 cat.create_table(make_users_schema()).unwrap();
6973 assert_round_trip(&cat);
6974 }
6975
6976 #[test]
6977 fn nsw_clone_is_o1() {
6978 let mut cat = Catalog::new();
6987 cat.create_table(TableSchema::new(
6988 "docs",
6989 alloc::vec![
6990 ColumnSchema::new("id", DataType::Int, false),
6991 ColumnSchema::new(
6992 "v",
6993 DataType::Vector {
6994 dim: 3,
6995 encoding: VecEncoding::F32
6996 },
6997 true
6998 ),
6999 ],
7000 ))
7001 .unwrap();
7002 let t = cat.get_mut("docs").unwrap();
7003 for i in 0..1500_i32 {
7004 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
7006 t.insert(Row::new(alloc::vec![
7007 Value::Int(i),
7008 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7009 ]))
7010 .unwrap();
7011 }
7012 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7013 .unwrap();
7014 let g = match &cat.get("docs").unwrap().indices()[0].kind {
7015 IndexKind::Nsw(g) => g,
7016 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7017 panic!("expected NSW")
7018 }
7019 };
7020 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
7023 assert!(
7024 g.layers.len() >= 2,
7025 "1500 nodes should populate at least two HNSW layers, got {}",
7026 g.layers.len()
7027 );
7028
7029 let cloned = g.clone();
7030
7031 assert!(
7032 g.levels.shares_storage_with(&cloned.levels),
7033 "levels PV not shared after clone — clone copied elements (O(N))"
7034 );
7035 assert_eq!(g.layers.len(), cloned.layers.len());
7036 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
7037 assert!(
7038 orig.shares_storage_with(cl),
7039 "layer {l} PV not shared after clone — clone copied elements (O(N))"
7040 );
7041 }
7042 }
7043
7044 #[test]
7045 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
7046 let mut cat = Catalog::new();
7053 cat.create_table(TableSchema::new(
7054 "vecs",
7055 alloc::vec![
7056 ColumnSchema::new("id", DataType::Int, false),
7057 ColumnSchema::new(
7058 "v",
7059 DataType::Vector {
7060 dim: 8,
7061 encoding: VecEncoding::Sq8,
7062 },
7063 false,
7064 ),
7065 ],
7066 ))
7067 .unwrap();
7068 let t = cat.get_mut("vecs").unwrap();
7069 for i in 0..32_i32 {
7070 #[allow(clippy::cast_precision_loss)]
7071 let base = (i as f32) * 0.03;
7072 let v: Vec<f32> = (0..8_i32)
7073 .map(|j| {
7074 #[allow(clippy::cast_precision_loss)]
7075 let off = (j as f32) * 0.01;
7076 base + off
7077 })
7078 .collect();
7079 t.insert(Row::new(alloc::vec![
7080 Value::Int(i),
7081 Value::Sq8Vector(quantize::quantize(&v)),
7082 ]))
7083 .unwrap();
7084 }
7085 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7086 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7089 let (before_cell, before_ty, before_hits) = {
7090 let t_ref = cat.get("vecs").unwrap();
7091 (
7092 t_ref.rows()[5].values[1].clone(),
7093 t_ref.schema().columns[1].ty,
7094 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7095 )
7096 };
7097
7098 let bytes = cat.serialize();
7099 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7100 let rt = restored.get("vecs").unwrap();
7101 assert_eq!(rt.schema().columns[1].ty, before_ty);
7102 assert_eq!(rt.rows()[5].values[1], before_cell);
7103 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7104 assert_eq!(before_hits, after_hits);
7105 }
7106
7107 #[test]
7108 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
7109 use crate::halfvec;
7116 let mut cat = Catalog::new();
7117 cat.create_table(TableSchema::new(
7118 "vecs",
7119 alloc::vec![
7120 ColumnSchema::new("id", DataType::Int, false),
7121 ColumnSchema::new(
7122 "v",
7123 DataType::Vector {
7124 dim: 8,
7125 encoding: VecEncoding::F16,
7126 },
7127 false,
7128 ),
7129 ],
7130 ))
7131 .unwrap();
7132 let t = cat.get_mut("vecs").unwrap();
7133 for i in 0..32_i32 {
7134 #[allow(clippy::cast_precision_loss)]
7135 let base = (i as f32) * 0.03;
7136 let v: Vec<f32> = (0..8_i32)
7137 .map(|j| {
7138 #[allow(clippy::cast_precision_loss)]
7139 let off = (j as f32) * 0.01;
7140 base + off
7141 })
7142 .collect();
7143 t.insert(Row::new(alloc::vec![
7144 Value::Int(i),
7145 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
7146 ]))
7147 .unwrap();
7148 }
7149 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7150 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
7151 let (before_cell, before_ty, before_hits) = {
7152 let t_ref = cat.get("vecs").unwrap();
7153 (
7154 t_ref.rows()[5].values[1].clone(),
7155 t_ref.schema().columns[1].ty,
7156 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
7157 )
7158 };
7159 let bytes = cat.serialize();
7160 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
7161 let rt = restored.get("vecs").unwrap();
7162 assert_eq!(rt.schema().columns[1].ty, before_ty);
7163 assert_eq!(rt.rows()[5].values[1], before_cell);
7164 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
7165 assert_eq!(before_hits, after_hits);
7166 }
7167
7168 #[test]
7169 #[allow(clippy::similar_names)]
7170 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
7171 use crate::halfvec;
7178 fn next(state: &mut u64) -> f32 {
7179 *state = state
7180 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7181 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7182 #[allow(clippy::cast_precision_loss)]
7183 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7184 2.0 * u - 1.0
7185 }
7186 let dim: u32 = 32;
7187 let n: usize = 512;
7188 let dim_us = dim as usize;
7189 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
7190 let corpus: Vec<Vec<f32>> = (0..n)
7191 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7192 .collect();
7193 let queries: Vec<Vec<f32>> = (0..32)
7194 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7195 .collect();
7196 let exact_top10: Vec<Vec<usize>> = queries
7197 .iter()
7198 .map(|q| {
7199 let mut scored: Vec<(f32, usize)> = corpus
7200 .iter()
7201 .enumerate()
7202 .map(|(i, v)| (l2_distance_sq(v, q), i))
7203 .collect();
7204 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7205 scored.into_iter().take(10).map(|(_, i)| i).collect()
7206 })
7207 .collect();
7208 let mut cat = Catalog::new();
7209 cat.create_table(TableSchema::new(
7210 "vecs",
7211 alloc::vec![
7212 ColumnSchema::new("id", DataType::Int, false),
7213 ColumnSchema::new(
7214 "v",
7215 DataType::Vector {
7216 dim,
7217 encoding: VecEncoding::F16,
7218 },
7219 false,
7220 ),
7221 ],
7222 ))
7223 .unwrap();
7224 let t = cat.get_mut("vecs").unwrap();
7225 for (i, v) in corpus.iter().enumerate() {
7226 t.insert(Row::new(alloc::vec![
7227 Value::Int(i32::try_from(i).unwrap()),
7228 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
7229 ]))
7230 .unwrap();
7231 }
7232 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7233 let table = cat.get("vecs").unwrap();
7234 let mut total_overlap = 0_usize;
7235 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7236 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7237 for h in &hits {
7238 if exact.contains(h) {
7239 total_overlap += 1;
7240 }
7241 }
7242 }
7243 #[allow(clippy::cast_precision_loss)]
7244 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7245 assert!(
7246 recall >= 0.95,
7247 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7248 check halfvec dispatch in `cell_to_query_metric_distance`"
7249 );
7250 }
7251
7252 #[test]
7253 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
7254 use crate::quantize;
7261 fn next(state: &mut u64) -> f32 {
7265 *state = state
7266 .wrapping_add(0x9E37_79B9_7F4A_7C15)
7267 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
7268 #[allow(clippy::cast_precision_loss)]
7269 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
7270 2.0 * u - 1.0
7271 }
7272 let dim: u32 = 32;
7273 let n: usize = 512;
7274 let dim_us = dim as usize;
7275 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
7276 let corpus: Vec<Vec<f32>> = (0..n)
7277 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7278 .collect();
7279 let queries: Vec<Vec<f32>> = (0..32)
7280 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
7281 .collect();
7282 let exact_top10: Vec<Vec<usize>> = queries
7284 .iter()
7285 .map(|q| {
7286 let mut scored: Vec<(f32, usize)> = corpus
7287 .iter()
7288 .enumerate()
7289 .map(|(i, v)| (l2_distance_sq(v, q), i))
7290 .collect();
7291 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7292 scored.into_iter().take(10).map(|(_, i)| i).collect()
7293 })
7294 .collect();
7295 let mut cat = Catalog::new();
7298 cat.create_table(TableSchema::new(
7299 "vecs",
7300 alloc::vec![
7301 ColumnSchema::new("id", DataType::Int, false),
7302 ColumnSchema::new(
7303 "v",
7304 DataType::Vector {
7305 dim,
7306 encoding: VecEncoding::Sq8,
7307 },
7308 false,
7309 ),
7310 ],
7311 ))
7312 .unwrap();
7313 let t = cat.get_mut("vecs").unwrap();
7314 for (i, v) in corpus.iter().enumerate() {
7315 t.insert(Row::new(alloc::vec![
7316 Value::Int(i32::try_from(i).unwrap()),
7317 Value::Sq8Vector(quantize::quantize(v)),
7318 ]))
7319 .unwrap();
7320 }
7321 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7322 let table = cat.get("vecs").unwrap();
7323 let mut total_overlap = 0_usize;
7324 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
7325 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
7326 for h in &hits {
7327 if exact.contains(h) {
7328 total_overlap += 1;
7329 }
7330 }
7331 }
7332 #[allow(clippy::cast_precision_loss)]
7333 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
7334 assert!(
7335 recall >= 0.95,
7336 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
7337 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
7338 );
7339 }
7340
7341 #[test]
7342 fn nsw_index_topology_persists_through_round_trip() {
7343 let mut cat = Catalog::new();
7349 cat.create_table(TableSchema::new(
7350 "docs",
7351 alloc::vec![
7352 ColumnSchema::new("id", DataType::Int, false),
7353 ColumnSchema::new(
7354 "v",
7355 DataType::Vector {
7356 dim: 3,
7357 encoding: VecEncoding::F32
7358 },
7359 true
7360 ),
7361 ],
7362 ))
7363 .unwrap();
7364 let t = cat.get_mut("docs").unwrap();
7365 for i in 0..6_i32 {
7366 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
7368 let row = Row::new(alloc::vec![
7369 Value::Int(i),
7370 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
7371 ]);
7372 t.insert(row).unwrap();
7373 }
7374 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
7375 .unwrap();
7376 let original = match &cat.get("docs").unwrap().indices()[0].kind {
7377 IndexKind::Nsw(g) => g.clone(),
7378 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7379 panic!("expected NSW")
7380 }
7381 };
7382 let bytes = cat.serialize();
7383 let restored = Catalog::deserialize(&bytes).expect("deserialize");
7384 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
7385 IndexKind::Nsw(g) => g.clone(),
7386 IndexKind::BTree(_) | IndexKind::Brin { .. } | IndexKind::Gin(_) => {
7387 panic!("expected NSW")
7388 }
7389 };
7390 assert_eq!(restored_graph.m, original.m);
7391 assert_eq!(restored_graph.m_max_0, original.m_max_0);
7392 assert_eq!(restored_graph.entry, original.entry);
7393 assert_eq!(restored_graph.entry_level, original.entry_level);
7394 assert_eq!(restored_graph.levels, original.levels);
7395 assert_eq!(restored_graph.layers, original.layers);
7396 }
7397
7398 #[test]
7399 fn hnsw_level_assignment_is_deterministic() {
7400 for i in 0..32usize {
7403 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
7404 }
7405 }
7406
7407 #[test]
7408 fn hnsw_layer_0_dominates_population() {
7409 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
7414 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
7415 }
7416
7417 #[test]
7418 fn hnsw_search_matches_brute_force_for_l2_top1() {
7419 let mut cat = Catalog::new();
7423 cat.create_table(TableSchema::new(
7424 "vecs",
7425 alloc::vec![
7426 ColumnSchema::new("id", DataType::Int, false),
7427 ColumnSchema::new(
7428 "v",
7429 DataType::Vector {
7430 dim: 3,
7431 encoding: VecEncoding::F32
7432 },
7433 true
7434 ),
7435 ],
7436 ))
7437 .unwrap();
7438 let t = cat.get_mut("vecs").unwrap();
7439 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
7440 (1, [0.0, 0.0, 0.0]),
7441 (2, [1.0, 0.0, 0.0]),
7442 (3, [0.0, 1.0, 0.0]),
7443 (4, [0.0, 0.0, 1.0]),
7444 (5, [1.0, 1.0, 0.0]),
7445 (6, [1.0, 0.0, 1.0]),
7446 (7, [0.0, 1.0, 1.0]),
7447 (8, [1.0, 1.0, 1.0]),
7448 (9, [0.5, 0.5, 0.5]),
7449 (10, [0.2, 0.8, 0.5]),
7450 ];
7451 for &(id, v) in &dataset {
7452 t.insert(Row::new(alloc::vec![
7453 Value::Int(id),
7454 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
7455 ]))
7456 .unwrap();
7457 }
7458 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
7459 let idx_pos = cat
7460 .get("vecs")
7461 .unwrap()
7462 .indices()
7463 .iter()
7464 .position(|i| i.name == "v_idx")
7465 .unwrap();
7466 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
7467 let table = cat.get("vecs").unwrap();
7468 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
7469 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
7470 .map(|i| {
7471 let Value::Vector(v) = &table.rows[i].values[1] else {
7472 return (f32::INFINITY, i);
7473 };
7474 (l2_distance_sq(v, &query), i)
7475 })
7476 .collect();
7477 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
7478 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
7479 assert_eq!(
7480 hnsw_top[0].1, brute[0].1,
7481 "HNSW top-1 != brute-force top-1 for {query:?}"
7482 );
7483 }
7484 }
7485
7486 #[test]
7487 fn serialize_table_with_rows_round_trips() {
7488 let mut cat = Catalog::new();
7489 cat.create_table(make_users_schema()).unwrap();
7490 let t = cat.get_mut("users").unwrap();
7491 t.insert(Row::new(vec![
7492 Value::Int(1),
7493 Value::Text("alice".into()),
7494 Value::Float(95.5),
7495 ]))
7496 .unwrap();
7497 t.insert(Row::new(vec![
7498 Value::Int(2),
7499 Value::Text("bob".into()),
7500 Value::Null,
7501 ]))
7502 .unwrap();
7503 assert_round_trip(&cat);
7504 }
7505
7506 #[test]
7507 fn serialize_multiple_tables_round_trips() {
7508 let mut cat = Catalog::new();
7509 cat.create_table(make_users_schema()).unwrap();
7510 cat.create_table(TableSchema::new(
7511 "flags",
7512 vec![
7513 ColumnSchema::new("id", DataType::BigInt, false),
7514 ColumnSchema::new("active", DataType::Bool, false),
7515 ],
7516 ))
7517 .unwrap();
7518 cat.get_mut("flags")
7519 .unwrap()
7520 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
7521 .unwrap();
7522 assert_round_trip(&cat);
7523 }
7524
7525 #[test]
7526 fn deserialize_rejects_bad_magic() {
7527 let mut buf = b"BADMAGIC".to_vec();
7528 buf.push(FILE_VERSION);
7529 buf.extend_from_slice(&0u32.to_le_bytes());
7530 let err = Catalog::deserialize(&buf).unwrap_err();
7531 assert!(matches!(err, StorageError::Corrupt(_)));
7532 }
7533
7534 #[test]
7535 fn deserialize_rejects_unsupported_version() {
7536 let mut buf = FILE_MAGIC.to_vec();
7537 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
7539 let err = Catalog::deserialize(&buf).unwrap_err();
7540 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
7541 }
7542
7543 #[test]
7544 fn deserialize_rejects_truncated_file() {
7545 let mut cat = Catalog::new();
7546 cat.create_table(make_users_schema()).unwrap();
7547 let bytes = cat.serialize();
7548 let truncated = &bytes[..bytes.len() - 1];
7550 assert!(matches!(
7551 Catalog::deserialize(truncated),
7552 Err(StorageError::Corrupt(_))
7553 ));
7554 }
7555
7556 #[test]
7557 fn deserialize_rejects_trailing_garbage() {
7558 let cat = Catalog::new();
7559 let mut bytes = cat.serialize();
7560 bytes.push(0xFF);
7561 assert!(matches!(
7562 Catalog::deserialize(&bytes),
7563 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
7564 ));
7565 }
7566
7567 fn populated_users() -> Catalog {
7570 let mut cat = Catalog::new();
7571 cat.create_table(make_users_schema()).unwrap();
7572 let t = cat.get_mut("users").unwrap();
7573 for (id, name, score) in [
7574 (1, "alice", Some(90.0)),
7575 (2, "bob", None),
7576 (3, "alice", Some(70.0)), ] {
7578 t.insert(Row::new(vec![
7579 Value::Int(id),
7580 Value::Text(name.into()),
7581 score.map_or(Value::Null, Value::Float),
7582 ]))
7583 .unwrap();
7584 }
7585 cat
7586 }
7587
7588 #[test]
7589 fn add_index_builds_from_existing_rows() {
7590 let mut cat = populated_users();
7591 cat.get_mut("users")
7592 .unwrap()
7593 .add_index("by_id".into(), "id")
7594 .unwrap();
7595 let t = cat.get("users").unwrap();
7596 let idx = t.index_on(0).expect("index_on(0)");
7597 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7598 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
7599 }
7600
7601 #[test]
7602 fn add_index_dup_name_rejected() {
7603 let mut cat = populated_users();
7604 let t = cat.get_mut("users").unwrap();
7605 t.add_index("ix".into(), "id").unwrap();
7606 let err = t.add_index("ix".into(), "name").unwrap_err();
7607 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
7608 }
7609
7610 #[test]
7611 fn add_index_unknown_column_rejected() {
7612 let mut cat = populated_users();
7613 let err = cat
7614 .get_mut("users")
7615 .unwrap()
7616 .add_index("ix".into(), "ghost")
7617 .unwrap_err();
7618 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
7619 }
7620
7621 #[test]
7622 fn insert_after_create_index_updates_it() {
7623 let mut cat = populated_users();
7624 let t = cat.get_mut("users").unwrap();
7625 t.add_index("by_name".into(), "name").unwrap();
7626 t.insert(Row::new(vec![
7627 Value::Int(4),
7628 Value::Text("dave".into()),
7629 Value::Null,
7630 ]))
7631 .unwrap();
7632 let idx = t.index_on(1).unwrap();
7633 assert_eq!(
7634 idx.lookup_eq(&IndexKey::Text("dave".into())),
7635 &[RowLocator::Hot(3)]
7636 );
7637 assert_eq!(
7639 idx.lookup_eq(&IndexKey::Text("alice".into())),
7640 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7641 );
7642 }
7643
7644 #[test]
7645 fn null_or_float_values_are_not_indexed() {
7646 let mut cat = populated_users();
7647 let t = cat.get_mut("users").unwrap();
7648 t.add_index("by_score".into(), "score").unwrap();
7649 let idx = t.index_on(2).unwrap();
7650 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
7655 }
7656
7657 #[test]
7660 fn vector_value_data_type_carries_dim() {
7661 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
7662 assert_eq!(
7663 v.data_type(),
7664 Some(DataType::Vector {
7665 dim: 3,
7666 encoding: VecEncoding::F32
7667 })
7668 );
7669 }
7670
7671 #[test]
7672 fn vector_column_insert_matching_dim_ok() {
7673 let mut cat = Catalog::new();
7674 cat.create_table(TableSchema::new(
7675 "emb",
7676 vec![ColumnSchema::new(
7677 "v",
7678 DataType::Vector {
7679 dim: 3,
7680 encoding: VecEncoding::F32,
7681 },
7682 false,
7683 )],
7684 ))
7685 .unwrap();
7686 cat.get_mut("emb")
7687 .unwrap()
7688 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
7689 .unwrap();
7690 }
7691
7692 #[test]
7693 fn vector_column_insert_dim_mismatch_rejected() {
7694 let mut cat = Catalog::new();
7695 cat.create_table(TableSchema::new(
7696 "emb",
7697 vec![ColumnSchema::new(
7698 "v",
7699 DataType::Vector {
7700 dim: 3,
7701 encoding: VecEncoding::F32,
7702 },
7703 false,
7704 )],
7705 ))
7706 .unwrap();
7707 let err = cat
7708 .get_mut("emb")
7709 .unwrap()
7710 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
7711 .unwrap_err();
7712 assert!(matches!(err, StorageError::TypeMismatch { .. }));
7713 }
7714
7715 #[test]
7716 fn vector_value_survives_catalog_round_trip() {
7717 let mut cat = Catalog::new();
7718 cat.create_table(TableSchema::new(
7719 "emb",
7720 vec![
7721 ColumnSchema::new("id", DataType::Int, false),
7722 ColumnSchema::new(
7723 "v",
7724 DataType::Vector {
7725 dim: 4,
7726 encoding: VecEncoding::F32,
7727 },
7728 false,
7729 ),
7730 ],
7731 ))
7732 .unwrap();
7733 cat.get_mut("emb")
7734 .unwrap()
7735 .insert(Row::new(vec![
7736 Value::Int(1),
7737 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
7738 ]))
7739 .unwrap();
7740 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
7741 let table = restored.get("emb").unwrap();
7742 assert_eq!(
7743 table.schema().columns[1].ty,
7744 DataType::Vector {
7745 dim: 4,
7746 encoding: VecEncoding::F32
7747 }
7748 );
7749 assert_eq!(
7750 table.rows()[0].values[1],
7751 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
7752 );
7753 }
7754
7755 #[test]
7756 fn index_survives_serialize_deserialize_round_trip() {
7757 let mut cat = populated_users();
7758 cat.get_mut("users")
7759 .unwrap()
7760 .add_index("by_name".into(), "name")
7761 .unwrap();
7762 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7763 let idx = restored
7764 .get("users")
7765 .unwrap()
7766 .index_on(1)
7767 .expect("index_on(1) after restore");
7768 assert_eq!(idx.name, "by_name");
7769 assert_eq!(
7771 idx.lookup_eq(&IndexKey::Text("alice".into())),
7772 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7773 );
7774 }
7775
7776 fn bigint_pk_users_schema() -> TableSchema {
7781 TableSchema::new(
7782 "users",
7783 vec![
7784 ColumnSchema::new("id", DataType::BigInt, false),
7785 ColumnSchema::new("name", DataType::Text, false),
7786 ],
7787 )
7788 }
7789
7790 fn make_user_row(id: i64, name: &str) -> Row {
7791 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
7792 }
7793
7794 #[test]
7795 fn lookup_by_pk_finds_row_via_hot_index() {
7796 let mut cat = Catalog::new();
7797 cat.create_table(bigint_pk_users_schema()).unwrap();
7798 let t = cat.get_mut("users").unwrap();
7799 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7800 t.insert(make_user_row(id, name)).unwrap();
7801 }
7802 t.add_index("by_id".into(), "id").unwrap();
7803 let got = cat
7805 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7806 .unwrap();
7807 assert_eq!(got, make_user_row(2, "bob"));
7808 assert_eq!(cat.cold_segment_count(), 0);
7809 }
7810
7811 #[test]
7812 fn lookup_by_pk_returns_none_when_key_missing() {
7813 let mut cat = Catalog::new();
7814 cat.create_table(bigint_pk_users_schema()).unwrap();
7815 let t = cat.get_mut("users").unwrap();
7816 t.insert(make_user_row(1, "alice")).unwrap();
7817 t.add_index("by_id".into(), "id").unwrap();
7818 assert!(
7819 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7820 .is_none()
7821 );
7822 assert!(
7824 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
7825 .is_none()
7826 );
7827 assert!(
7828 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
7829 .is_none()
7830 );
7831 }
7832
7833 #[test]
7834 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
7835 let mut cat = Catalog::new();
7839 cat.create_table(bigint_pk_users_schema()).unwrap();
7840 let t = cat.get_mut("users").unwrap();
7841 t.add_index("by_id".into(), "id").unwrap();
7842 let schema = t.schema.clone();
7843
7844 let cold_rows: Vec<(i64, &str)> =
7845 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
7846 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7847 .iter()
7848 .map(|(id, name)| {
7849 let row = make_user_row(*id, name);
7850 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7851 })
7852 .collect();
7853 let (seg_bytes, _meta) =
7854 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7855 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7856 assert_eq!(seg_id, 0);
7857 assert_eq!(cat.cold_segment_count(), 1);
7858
7859 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7860 .iter()
7861 .map(|(id, _)| {
7862 (
7863 IndexKey::Int(*id),
7864 RowLocator::Cold {
7865 segment_id: seg_id,
7866 page_offset: 0,
7867 },
7868 )
7869 })
7870 .collect();
7871 let registered = cat
7872 .get_mut("users")
7873 .unwrap()
7874 .register_cold_locators("by_id", pairs)
7875 .unwrap();
7876 assert_eq!(registered, 4);
7877
7878 for (id, name) in &cold_rows {
7879 let got = cat
7880 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7881 .unwrap_or_else(|| panic!("cold key {id} not found"));
7882 assert_eq!(got, make_user_row(*id, name));
7883 }
7884 assert!(
7886 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
7887 .is_none()
7888 );
7889 }
7890
7891 #[test]
7892 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
7893 let mut cat = Catalog::new();
7897 cat.create_table(bigint_pk_users_schema()).unwrap();
7898 let t = cat.get_mut("users").unwrap();
7899 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7900 t.insert(make_user_row(id, name)).unwrap();
7901 }
7902 t.add_index("by_id".into(), "id").unwrap();
7903 let schema = t.schema.clone();
7904
7905 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
7906 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7907 .iter()
7908 .map(|(id, name)| {
7909 let row = make_user_row(*id, name);
7910 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7911 })
7912 .collect();
7913 let (seg_bytes, _) =
7914 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7915 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
7916 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7917 .iter()
7918 .map(|(id, _)| {
7919 (
7920 IndexKey::Int(*id),
7921 RowLocator::Cold {
7922 segment_id: seg_id,
7923 page_offset: 0,
7924 },
7925 )
7926 })
7927 .collect();
7928 cat.get_mut("users")
7929 .unwrap()
7930 .register_cold_locators("by_id", pairs)
7931 .unwrap();
7932
7933 assert_eq!(
7935 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7936 .unwrap(),
7937 make_user_row(1, "alice")
7938 );
7939 assert_eq!(
7940 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7941 .unwrap(),
7942 make_user_row(2, "bob")
7943 );
7944 assert_eq!(
7946 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
7947 .unwrap(),
7948 make_user_row(100, "ivy")
7949 );
7950 assert_eq!(
7951 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
7952 .unwrap(),
7953 make_user_row(200, "joe")
7954 );
7955 assert!(
7957 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
7958 .is_none()
7959 );
7960 }
7961
7962 #[test]
7963 fn register_cold_locators_rejects_nsw_index() {
7964 let mut cat = Catalog::new();
7965 cat.create_table(TableSchema::new(
7966 "vecs",
7967 vec![
7968 ColumnSchema::new("id", DataType::Int, false),
7969 ColumnSchema::new(
7970 "v",
7971 DataType::Vector {
7972 dim: 4,
7973 encoding: VecEncoding::F32,
7974 },
7975 false,
7976 ),
7977 ],
7978 ))
7979 .unwrap();
7980 let t = cat.get_mut("vecs").unwrap();
7981 t.insert(Row::new(vec![
7982 Value::Int(1),
7983 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
7984 ]))
7985 .unwrap();
7986 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
7987 let err = t
7988 .register_cold_locators(
7989 "by_v",
7990 vec![(
7991 IndexKey::Int(1),
7992 RowLocator::Cold {
7993 segment_id: 0,
7994 page_offset: 0,
7995 },
7996 )],
7997 )
7998 .unwrap_err();
7999 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
8002 }
8003
8004 #[test]
8005 fn load_segment_bytes_rejects_garbage() {
8006 let mut cat = Catalog::new();
8007 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
8008 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
8009 assert_eq!(cat.cold_segment_count(), 0);
8011 }
8012
8013 #[test]
8014 fn load_segment_bytes_returns_sequential_ids() {
8015 let mut cat = Catalog::new();
8016 cat.create_table(bigint_pk_users_schema()).unwrap();
8017 let schema = cat.get("users").unwrap().schema.clone();
8018 for batch in 0u32..3 {
8019 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
8020 .map(|i| {
8021 let id = u64::from(batch) * 100 + i;
8022 let row = make_user_row(id.cast_signed(), "x");
8023 (id, encode_row_body_dense(&row, &schema))
8024 })
8025 .collect();
8026 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8027 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
8028 }
8029 assert_eq!(cat.cold_segment_count(), 3);
8030 }
8031
8032 #[test]
8039 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
8040 let mut cat = populated_users();
8047 cat.get_mut("users")
8048 .unwrap()
8049 .add_index("by_name".into(), "name")
8050 .unwrap();
8051
8052 let v8_bytes = encode_as_v8(&cat);
8057 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
8058
8059 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
8060 let idx = restored
8061 .get("users")
8062 .unwrap()
8063 .index_on(1)
8064 .expect("index_on(1) after restore");
8065 assert_eq!(
8068 idx.lookup_eq(&IndexKey::Text("alice".into())),
8069 &[RowLocator::Hot(0), RowLocator::Hot(2)]
8070 );
8071 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
8073 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
8074 }
8075 }
8076
8077 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
8082 let mut out = Vec::with_capacity(64);
8083 out.extend_from_slice(FILE_MAGIC);
8084 out.push(8u8);
8085 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
8086 for t in &cat.tables {
8087 write_str(&mut out, &t.schema.name);
8088 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
8089 for c in &t.schema.columns {
8090 write_str(&mut out, &c.name);
8091 write_data_type(&mut out, c.ty);
8092 out.push(u8::from(c.nullable));
8093 match &c.default {
8094 None => out.push(0),
8095 Some(v) => {
8096 out.push(1);
8097 write_value(&mut out, v);
8098 }
8099 }
8100 out.push(u8::from(c.auto_increment));
8101 }
8102 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
8103 for row in &t.rows {
8104 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
8105 }
8106 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
8107 for idx in &t.indices {
8108 write_str(&mut out, &idx.name);
8109 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
8110 match &idx.kind {
8111 IndexKind::BTree(_) => out.push(0),
8114 IndexKind::Nsw(g) => {
8115 out.push(1);
8116 write_u16(&mut out, u16::try_from(g.m).unwrap());
8117 write_nsw_graph(&mut out, g);
8118 }
8119 IndexKind::Brin { .. } => panic!(
8122 "v8 catalog writer cannot serialise BRIN — \
8123 tests with BRIN indices must use the current writer"
8124 ),
8125 IndexKind::Gin(_) => panic!(
8126 "v8 catalog writer cannot serialise GIN — \
8127 tests with GIN indices must use the current writer"
8128 ),
8129 }
8130 }
8131 }
8132 out
8133 }
8134
8135 #[test]
8141 fn v9_catalog_round_trip_preserves_cold_locators() {
8142 let mut cat = Catalog::new();
8143 cat.create_table(bigint_pk_users_schema()).unwrap();
8144 let t = cat.get_mut("users").unwrap();
8145 for (id, name) in [(1i64, "alice"), (2, "bob")] {
8147 t.insert(make_user_row(id, name)).unwrap();
8148 }
8149 t.add_index("by_id".into(), "id").unwrap();
8150 let schema = t.schema.clone();
8151
8152 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
8154 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
8155 .iter()
8156 .map(|(id, name)| {
8157 let row = make_user_row(*id, name);
8158 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
8159 })
8160 .collect();
8161 let (seg_bytes, _) =
8162 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
8163 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
8164 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
8165 .iter()
8166 .map(|(id, _)| {
8167 (
8168 IndexKey::Int(*id),
8169 RowLocator::Cold {
8170 segment_id: seg_id,
8171 page_offset: 0,
8172 },
8173 )
8174 })
8175 .collect();
8176 cat.get_mut("users")
8177 .unwrap()
8178 .register_cold_locators("by_id", pairs)
8179 .unwrap();
8180
8181 let bytes = cat.serialize();
8183 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
8184 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
8185
8186 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
8193 assert_eq!(restored_seg_id, seg_id);
8194
8195 let idx = restored.get("users").unwrap().index_on(0).unwrap();
8196 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
8198 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
8199 for (id, _) in &cold_rows {
8201 assert_eq!(
8202 idx.lookup_eq(&IndexKey::Int(*id)),
8203 &[RowLocator::Cold {
8204 segment_id: seg_id,
8205 page_offset: 0,
8206 }]
8207 );
8208 }
8209 assert_eq!(
8211 restored
8212 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8213 .unwrap(),
8214 make_user_row(2, "bob")
8215 );
8216 for (id, name) in &cold_rows {
8217 assert_eq!(
8218 restored
8219 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
8220 .unwrap(),
8221 make_user_row(*id, name)
8222 );
8223 }
8224 }
8225
8226 #[test]
8233 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
8234 let schema = TableSchema::new(
8235 "wide",
8236 vec![
8237 ColumnSchema::new("a", DataType::SmallInt, true),
8238 ColumnSchema::new("b", DataType::Int, false),
8239 ColumnSchema::new("c", DataType::BigInt, false),
8240 ColumnSchema::new("d", DataType::Float, false),
8241 ColumnSchema::new("e", DataType::Bool, false),
8242 ColumnSchema::new("f", DataType::Text, false),
8243 ColumnSchema::new(
8244 "g",
8245 DataType::Vector {
8246 dim: 3,
8247 encoding: VecEncoding::F32,
8248 },
8249 false,
8250 ),
8251 ColumnSchema::new(
8252 "h",
8253 DataType::Numeric {
8254 precision: 18,
8255 scale: 2,
8256 },
8257 false,
8258 ),
8259 ColumnSchema::new("i", DataType::Date, false),
8260 ColumnSchema::new("j", DataType::Timestamp, false),
8261 ],
8262 );
8263 let cases: &[Row] = &[
8264 Row::new(vec![
8265 Value::SmallInt(7),
8266 Value::Int(42),
8267 Value::BigInt(1_000_000),
8268 Value::Float(1.5),
8269 Value::Bool(true),
8270 Value::Text("hello".into()),
8271 Value::Vector(vec![1.0, 2.0, 3.0]),
8272 Value::Numeric {
8273 scaled: 12345,
8274 scale: 2,
8275 },
8276 Value::Date(20_000),
8277 Value::Timestamp(1_700_000_000_000_000),
8278 ]),
8279 Row::new(vec![
8281 Value::Null,
8282 Value::Int(0),
8283 Value::BigInt(0),
8284 Value::Float(0.0),
8285 Value::Bool(false),
8286 Value::Text(String::new()),
8287 Value::Vector(vec![]),
8288 Value::Numeric {
8289 scaled: 0,
8290 scale: 2,
8291 },
8292 Value::Date(0),
8293 Value::Timestamp(0),
8294 ]),
8295 Row::new(vec![
8296 Value::SmallInt(-1),
8297 Value::Int(-1),
8298 Value::BigInt(-1),
8299 Value::Float(-0.5),
8300 Value::Bool(true),
8301 Value::Text("a much longer payload here".into()),
8302 Value::Vector(vec![0.1, 0.2, 0.3]),
8303 Value::Numeric {
8304 scaled: -999_999_999,
8305 scale: 2,
8306 },
8307 Value::Date(-1),
8308 Value::Timestamp(-1),
8309 ]),
8310 ];
8311 for row in cases {
8312 let actual = encode_row_body_dense(row, &schema).len();
8313 let fast = row_body_encoded_len(row, &schema);
8314 assert_eq!(actual, fast, "row {row:?}");
8315 }
8316 }
8317
8318 #[test]
8319 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
8320 let mut cat = Catalog::new();
8321 cat.create_table(bigint_pk_users_schema()).unwrap();
8322 let t = cat.get_mut("users").unwrap();
8323 assert_eq!(t.hot_bytes(), 0);
8324 let mut expected: u64 = 0;
8325 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8326 let row = make_user_row(id, name);
8327 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
8328 t.insert(row).unwrap();
8329 }
8330 assert_eq!(t.hot_bytes(), expected);
8331 assert_eq!(cat.hot_tier_bytes(), expected);
8332 }
8333
8334 #[test]
8335 fn hot_bytes_shrinks_on_delete() {
8336 let mut cat = Catalog::new();
8337 cat.create_table(bigint_pk_users_schema()).unwrap();
8338 let t = cat.get_mut("users").unwrap();
8339 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
8340 t.insert(make_user_row(id, name)).unwrap();
8341 }
8342 let before = t.hot_bytes();
8343 let bob_row = make_user_row(2, "bob");
8345 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
8346 let removed = t.delete_rows(&[1]);
8347 assert_eq!(removed, 1);
8348 assert_eq!(t.hot_bytes(), before - bob_bytes);
8349 }
8350
8351 #[test]
8352 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
8353 let mut cat = Catalog::new();
8354 cat.create_table(bigint_pk_users_schema()).unwrap();
8355 let t = cat.get_mut("users").unwrap();
8356 t.insert(make_user_row(1, "alice")).unwrap();
8357 let after_insert = t.hot_bytes();
8358 let new_row = make_user_row(1, "alice-the-longer-name");
8361 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
8362 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
8363 t.update_row(0, new_row.values).unwrap();
8364 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
8365 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
8366 }
8367
8368 #[test]
8369 fn hot_bytes_round_trips_through_serialize_deserialize() {
8370 let mut cat = Catalog::new();
8371 cat.create_table(bigint_pk_users_schema()).unwrap();
8372 let t = cat.get_mut("users").unwrap();
8373 for i in 0..10 {
8374 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
8375 .unwrap();
8376 }
8377 let pre = cat.hot_tier_bytes();
8378 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
8379 assert_eq!(restored.hot_tier_bytes(), pre);
8380 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
8381 }
8382
8383 #[test]
8390 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
8391 let mut cat = Catalog::new();
8392 cat.create_table(bigint_pk_users_schema()).unwrap();
8393 let t = cat.get_mut("users").unwrap();
8394 for id in 0..10i64 {
8395 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8396 .unwrap();
8397 }
8398 t.add_index("by_id".into(), "id").unwrap();
8399 let total_bytes_before = t.hot_bytes();
8400
8401 let report = cat
8402 .freeze_oldest_to_cold("users", "by_id", 6)
8403 .expect("freeze succeeds");
8404 assert_eq!(report.frozen_rows, 6);
8405 assert_eq!(report.segment_id, 0);
8406 assert!(report.bytes_freed > 0);
8407 assert!(!report.segment_bytes.is_empty());
8408
8409 let t = cat.get("users").unwrap();
8410 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
8411 assert_eq!(cat.cold_segment_count(), 1);
8412 assert_eq!(
8414 t.hot_bytes(),
8415 total_bytes_before - report.bytes_freed,
8416 "hot_bytes accounting matches FreezeReport"
8417 );
8418
8419 for id in 0..10i64 {
8422 let got = cat
8423 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8424 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
8425 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8426 }
8427 }
8428
8429 #[test]
8434 fn freeze_twice_preserves_prior_cold_locators() {
8435 let mut cat = Catalog::new();
8436 cat.create_table(bigint_pk_users_schema()).unwrap();
8437 let t = cat.get_mut("users").unwrap();
8438 for id in 0..12i64 {
8439 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8440 .unwrap();
8441 }
8442 t.add_index("by_id".into(), "id").unwrap();
8443
8444 cat.freeze_oldest_to_cold("users", "by_id", 4)
8445 .expect("first freeze ok");
8446 cat.freeze_oldest_to_cold("users", "by_id", 4)
8447 .expect("second freeze ok");
8448
8449 assert_eq!(cat.get("users").unwrap().row_count(), 4);
8450 assert_eq!(cat.cold_segment_count(), 2);
8451 for id in 0..12i64 {
8454 let got = cat
8455 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8456 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
8457 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8458 }
8459 }
8460
8461 #[test]
8464 fn freeze_oldest_to_cold_rejects_invalid_input() {
8465 let mut cat = Catalog::new();
8466 cat.create_table(bigint_pk_users_schema()).unwrap();
8467 let t = cat.get_mut("users").unwrap();
8468 for id in 0..3i64 {
8469 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8470 .unwrap();
8471 }
8472 t.add_index("by_id".into(), "id").unwrap();
8473
8474 assert!(matches!(
8476 cat.freeze_oldest_to_cold("users", "by_id", 0),
8477 Err(StorageError::Corrupt(_))
8478 ));
8479 assert!(matches!(
8481 cat.freeze_oldest_to_cold("missing", "by_id", 1),
8482 Err(StorageError::Corrupt(_))
8483 ));
8484 assert!(matches!(
8486 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
8487 Err(StorageError::Corrupt(_))
8488 ));
8489 assert!(matches!(
8491 cat.freeze_oldest_to_cold("users", "by_id", 999),
8492 Err(StorageError::Corrupt(_))
8493 ));
8494 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8496 assert_eq!(cat.cold_segment_count(), 0);
8497 }
8498
8499 #[test]
8502 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
8503 let mut cat = Catalog::new();
8504 cat.create_table(TableSchema::new(
8505 "by_name",
8506 vec![
8507 ColumnSchema::new("name", DataType::Text, false),
8508 ColumnSchema::new("payload", DataType::BigInt, false),
8509 ],
8510 ))
8511 .unwrap();
8512 let t = cat.get_mut("by_name").unwrap();
8513 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
8514 .unwrap();
8515 t.add_index("by_n".into(), "name").unwrap();
8516 let err = cat
8517 .freeze_oldest_to_cold("by_name", "by_n", 1)
8518 .expect_err("non-integer PK rejected");
8519 match err {
8520 StorageError::Corrupt(s) => assert!(
8521 s.contains("non-integer"),
8522 "error message names the constraint: {s}"
8523 ),
8524 other => panic!("expected Corrupt, got {other:?}"),
8525 }
8526 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
8528 assert_eq!(cat.cold_segment_count(), 0);
8529 }
8530
8531 #[test]
8536 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
8537 let mut cat = Catalog::new();
8538 cat.create_table(bigint_pk_users_schema()).unwrap();
8539 let t = cat.get_mut("users").unwrap();
8540 for id in 0..6i64 {
8541 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8542 .unwrap();
8543 }
8544 t.add_index("by_id".into(), "id").unwrap();
8545 t.add_index("by_name".into(), "name").unwrap();
8546
8547 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8548
8549 let idx = cat.get("users").unwrap().index_on(1).unwrap();
8553 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
8554 assert_eq!(got.len(), 1);
8555 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
8556 match got[0] {
8557 RowLocator::Hot(i) => {
8558 assert_eq!(i, 1);
8561 }
8562 RowLocator::Cold { .. } => unreachable!(),
8563 }
8564 }
8565
8566 #[test]
8574 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
8575 let mut cat = Catalog::new();
8576 cat.create_table(bigint_pk_users_schema()).unwrap();
8577 let t = cat.get_mut("users").unwrap();
8578 for id in 0..6i64 {
8579 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8580 .unwrap();
8581 }
8582 t.add_index("by_id".into(), "id").unwrap();
8583 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8586 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
8587
8588 let new_idx = cat
8590 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
8591 .expect("promote ok")
8592 .expect("PK 2 was cold");
8593 assert_eq!(
8594 new_idx, 2,
8595 "promoted row appended after the 2 surviving hot rows"
8596 );
8597
8598 let t = cat.get("users").unwrap();
8599 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
8600 let row = make_user_row(2, "u-2");
8602 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
8603 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
8604
8605 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
8608 assert_eq!(entries.len(), 1, "exactly one locator per key");
8609 assert!(entries[0].is_hot(), "promote retired the Cold locator");
8610 assert_eq!(
8612 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8613 .unwrap(),
8614 row
8615 );
8616 assert_eq!(
8619 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8620 .unwrap(),
8621 make_user_row(0, "u-0")
8622 );
8623 }
8624
8625 #[test]
8629 fn promote_cold_row_returns_none_when_key_is_not_cold() {
8630 let mut cat = Catalog::new();
8631 cat.create_table(bigint_pk_users_schema()).unwrap();
8632 let t = cat.get_mut("users").unwrap();
8633 t.insert(make_user_row(7, "alice")).unwrap();
8634 t.add_index("by_id".into(), "id").unwrap();
8635
8636 assert!(
8638 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
8639 .unwrap()
8640 .is_none()
8641 );
8642 assert!(
8644 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
8645 .unwrap()
8646 .is_none()
8647 );
8648 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8650 assert_eq!(cat.cold_segment_count(), 0);
8651 }
8652
8653 #[test]
8658 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
8659 let mut cat = Catalog::new();
8660 cat.create_table(bigint_pk_users_schema()).unwrap();
8661 let t = cat.get_mut("users").unwrap();
8662 for id in 0..5i64 {
8663 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8664 .unwrap();
8665 }
8666 t.add_index("by_id".into(), "id").unwrap();
8667 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8668
8669 assert!(
8671 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8672 .is_some(),
8673 "frozen PK resolves before shadow"
8674 );
8675 let removed = cat
8676 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8677 .unwrap();
8678 assert_eq!(removed, 1, "exactly one cold locator retired");
8679
8680 assert!(
8683 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
8684 .is_none(),
8685 "shadowed key no longer resolves"
8686 );
8687 assert_eq!(
8689 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
8690 .unwrap(),
8691 make_user_row(0, "u-0")
8692 );
8693 assert_eq!(
8694 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
8695 .unwrap(),
8696 make_user_row(2, "u-2")
8697 );
8698 }
8699
8700 #[test]
8705 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
8706 let mut cat = Catalog::new();
8707 cat.create_table(bigint_pk_users_schema()).unwrap();
8708 let t = cat.get_mut("users").unwrap();
8709 t.insert(make_user_row(1, "alice")).unwrap();
8710 t.add_index("by_id".into(), "id").unwrap();
8711 assert_eq!(
8712 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8713 .unwrap(),
8714 0,
8715 "hot-only key drops no cold locators"
8716 );
8717 assert_eq!(
8718 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
8719 .unwrap(),
8720 0,
8721 "absent key drops no cold locators"
8722 );
8723 assert_eq!(cat.get("users").unwrap().row_count(), 1);
8724 }
8725
8726 #[test]
8728 fn promote_and_shadow_reject_invalid_inputs() {
8729 let mut cat = Catalog::new();
8730 cat.create_table(bigint_pk_users_schema()).unwrap();
8731 let t = cat.get_mut("users").unwrap();
8732 t.insert(make_user_row(1, "alice")).unwrap();
8733 t.add_index("by_id".into(), "id").unwrap();
8734
8735 assert!(matches!(
8737 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
8738 Err(StorageError::Corrupt(_))
8739 ));
8740 assert!(matches!(
8741 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
8742 Err(StorageError::Corrupt(_))
8743 ));
8744 assert!(matches!(
8746 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8747 Err(StorageError::Corrupt(_))
8748 ));
8749 assert!(matches!(
8750 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
8751 Err(StorageError::Corrupt(_))
8752 ));
8753 }
8754
8755 #[test]
8762 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
8763 let mut a = Catalog::new();
8764 let mut b = Catalog::new();
8765 for cat in [&mut a, &mut b] {
8766 cat.create_table(bigint_pk_users_schema()).unwrap();
8767 let t = cat.get_mut("users").unwrap();
8768 for id in 0..10i64 {
8769 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8770 .unwrap();
8771 }
8772 t.add_index("by_id".into(), "id").unwrap();
8773 }
8774 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
8775 let slice = b
8776 .prepare_freeze_slice("users", "by_id", 0..6)
8777 .expect("prepare");
8778 let parallel = b
8779 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
8780 .expect("commit");
8781 assert_eq!(single.segment_id, parallel.segment_id);
8782 assert_eq!(single.frozen_rows, parallel.frozen_rows);
8783 assert_eq!(single.bytes_freed, parallel.bytes_freed);
8784 assert_eq!(single.segment_bytes, parallel.segment_bytes);
8785 for id in 0..10i64 {
8787 assert_eq!(
8788 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8789 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8790 "PK {id} differs after single vs slice freeze"
8791 );
8792 }
8793 }
8794
8795 #[test]
8800 fn commit_freeze_slices_two_slices_match_single_slice() {
8801 let mut a = Catalog::new();
8802 let mut b = Catalog::new();
8803 for cat in [&mut a, &mut b] {
8804 cat.create_table(bigint_pk_users_schema()).unwrap();
8805 let t = cat.get_mut("users").unwrap();
8806 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
8809 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
8810 .unwrap();
8811 }
8812 t.add_index("by_id".into(), "id").unwrap();
8813 }
8814 let single = a
8815 .prepare_freeze_slice("users", "by_id", 0..8)
8816 .expect("prepare");
8817 let one = a
8818 .commit_freeze_slices("users", "by_id", alloc::vec![single])
8819 .expect("commit one");
8820 let s1 = b
8821 .prepare_freeze_slice("users", "by_id", 0..4)
8822 .expect("prepare s1");
8823 let s2 = b
8824 .prepare_freeze_slice("users", "by_id", 4..8)
8825 .expect("prepare s2");
8826 let two = b
8827 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
8828 .expect("commit two");
8829 assert_eq!(one.segment_bytes, two.segment_bytes);
8830 assert_eq!(one.frozen_rows, two.frozen_rows);
8831 for id in 0..10i64 {
8834 assert_eq!(
8835 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8836 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
8837 "PK {id} differs after one-slice vs two-slice freeze"
8838 );
8839 }
8840 }
8841
8842 #[test]
8844 fn commit_freeze_slices_rejects_gap() {
8845 let mut cat = Catalog::new();
8846 cat.create_table(bigint_pk_users_schema()).unwrap();
8847 let t = cat.get_mut("users").unwrap();
8848 for id in 0..6i64 {
8849 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8850 .unwrap();
8851 }
8852 t.add_index("by_id".into(), "id").unwrap();
8853 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
8854 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
8855 assert!(matches!(
8856 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
8857 Err(StorageError::Corrupt(_))
8858 ));
8859 assert_eq!(cat.cold_segment_count(), 0);
8861 assert_eq!(cat.get("users").unwrap().row_count(), 6);
8862 }
8863
8864 #[test]
8866 fn commit_freeze_slices_empty_is_noop() {
8867 let mut cat = Catalog::new();
8868 cat.create_table(bigint_pk_users_schema()).unwrap();
8869 let t = cat.get_mut("users").unwrap();
8870 for id in 0..3i64 {
8871 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8872 .unwrap();
8873 }
8874 t.add_index("by_id".into(), "id").unwrap();
8875 let report = cat
8876 .commit_freeze_slices("users", "by_id", Vec::new())
8877 .unwrap();
8878 assert_eq!(report.frozen_rows, 0);
8879 assert_eq!(cat.cold_segment_count(), 0);
8880 assert_eq!(cat.get("users").unwrap().row_count(), 3);
8881 }
8882
8883 #[test]
8890 fn compact_merges_small_segments_storage_unit() {
8891 let mut cat = Catalog::new();
8892 cat.create_table(bigint_pk_users_schema()).unwrap();
8893 let t = cat.get_mut("users").unwrap();
8894 for id in 0..8i64 {
8895 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8896 .unwrap();
8897 }
8898 t.add_index("by_id".into(), "id").unwrap();
8899 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8901 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8902 assert_eq!(cat.cold_segment_count(), 2);
8903 assert_eq!(cat.cold_segment_slot_count(), 2);
8904
8905 let max_seg_bytes = cat
8908 .cold_segment_ids_global()
8909 .iter()
8910 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8911 .max()
8912 .unwrap();
8913 let target = max_seg_bytes + 1;
8914
8915 let report = cat
8916 .compact_cold_segments("users", "by_id", target)
8917 .expect("compact succeeds");
8918 assert_eq!(report.sources.len(), 2);
8919 let merged_id = report.merged_segment_id.expect("merge happened");
8920 assert_eq!(report.merged_rows, 6);
8921 assert_eq!(report.deleted_rows_pruned, 0);
8922 assert!(!report.merged_segment_bytes.is_empty());
8923
8924 assert_eq!(cat.cold_segment_count(), 1);
8927 assert_eq!(cat.cold_segment_slot_count(), 3);
8928 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
8929
8930 for id in 0..8i64 {
8933 let got = cat
8934 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8935 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
8936 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8937 }
8938 }
8939
8940 #[test]
8944 fn compact_drops_shadowed_cold_rows() {
8945 let mut cat = Catalog::new();
8946 cat.create_table(bigint_pk_users_schema()).unwrap();
8947 let t = cat.get_mut("users").unwrap();
8948 for id in 0..6i64 {
8949 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8950 .unwrap();
8951 }
8952 t.add_index("by_id".into(), "id").unwrap();
8953 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8954 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8955 assert_eq!(
8957 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
8958 .unwrap(),
8959 1
8960 );
8961 assert_eq!(
8962 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
8963 .unwrap(),
8964 1
8965 );
8966
8967 let max_seg_bytes = cat
8968 .cold_segment_ids_global()
8969 .iter()
8970 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8971 .max()
8972 .unwrap();
8973 let report = cat
8974 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8975 .expect("compact succeeds");
8976 assert_eq!(report.sources.len(), 2);
8977 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
8978 assert_eq!(report.deleted_rows_pruned, 2);
8979
8980 for shadowed in [1i64, 4i64] {
8982 assert!(
8983 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
8984 .is_none(),
8985 "shadowed PK {shadowed} must remain invisible after compact"
8986 );
8987 }
8988 for live in [0i64, 2, 3, 5] {
8990 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
8991 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
8992 }
8993 }
8994
8995 #[test]
8998 fn compact_is_noop_below_two_candidates() {
8999 let mut cat = Catalog::new();
9000 cat.create_table(bigint_pk_users_schema()).unwrap();
9001 let t = cat.get_mut("users").unwrap();
9002 for id in 0..6i64 {
9003 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9004 .unwrap();
9005 }
9006 t.add_index("by_id".into(), "id").unwrap();
9007 let report = cat
9009 .compact_cold_segments("users", "by_id", 1 << 30)
9010 .expect("noop ok");
9011 assert!(report.merged_segment_id.is_none());
9012 assert!(report.sources.is_empty());
9013
9014 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
9016 let report = cat
9017 .compact_cold_segments("users", "by_id", 1 << 30)
9018 .expect("noop ok");
9019 assert!(report.merged_segment_id.is_none());
9020 assert_eq!(cat.cold_segment_count(), 1);
9021
9022 let report = cat
9025 .compact_cold_segments("users", "by_id", 1)
9026 .expect("noop ok");
9027 assert!(report.merged_segment_id.is_none());
9028 assert_eq!(cat.cold_segment_count(), 1);
9029 }
9030
9031 #[test]
9039 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
9040 let mut cat = Catalog::new();
9041 cat.create_table(bigint_pk_users_schema()).unwrap();
9042 let t = cat.get_mut("users").unwrap();
9043 for id in 0..6i64 {
9044 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9045 .unwrap();
9046 }
9047 t.add_index("by_id".into(), "id").unwrap();
9048 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9049 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
9050 let max_seg_bytes = cat
9051 .cold_segment_ids_global()
9052 .iter()
9053 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
9054 .max()
9055 .unwrap();
9056 let report = cat
9057 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
9058 .expect("compact ok");
9059 let merged_id = report.merged_segment_id.unwrap();
9060
9061 let cat_bytes = cat.serialize();
9066 let merged_bytes = report.merged_segment_bytes.clone();
9067
9068 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
9069 restored
9070 .load_segment_bytes_at(merged_id, merged_bytes)
9071 .expect("reload merged ok");
9072
9073 for id in 0..6i64 {
9075 let got = restored
9076 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9077 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
9078 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
9079 }
9080 assert_eq!(restored.cold_segment_count(), 1);
9083 }
9084
9085 #[test]
9088 fn load_segment_bytes_at_pads_and_rejects_collision() {
9089 let mut cat = Catalog::new();
9090 cat.create_table(bigint_pk_users_schema()).unwrap();
9091 let t = cat.get_mut("users").unwrap();
9092 for id in 0..4i64 {
9093 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9094 .unwrap();
9095 }
9096 t.add_index("by_id".into(), "id").unwrap();
9097 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9098 let bytes_seg0 = report.segment_bytes.clone();
9099
9100 cat.load_segment_bytes_at(5, bytes_seg0.clone())
9104 .expect("pad + load ok");
9105 assert_eq!(cat.cold_segment_slot_count(), 6);
9106 assert_eq!(cat.cold_segment_count(), 2);
9107
9108 assert!(matches!(
9110 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
9111 Err(StorageError::Corrupt(_))
9112 ));
9113 assert!(matches!(
9115 cat.load_segment_bytes_at(0, bytes_seg0),
9116 Err(StorageError::Corrupt(_))
9117 ));
9118 }
9119
9120 #[test]
9124 fn promote_then_refreeze_does_not_leave_orphan_locators() {
9125 let mut cat = Catalog::new();
9126 cat.create_table(bigint_pk_users_schema()).unwrap();
9127 let t = cat.get_mut("users").unwrap();
9128 for id in 0..4i64 {
9129 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
9130 .unwrap();
9131 }
9132 t.add_index("by_id".into(), "id").unwrap();
9133
9134 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
9136 let promoted = cat
9137 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
9138 .unwrap();
9139 assert!(promoted.is_some());
9140 let entries_after_promote = cat
9141 .get("users")
9142 .unwrap()
9143 .index_on(0)
9144 .unwrap()
9145 .lookup_eq(&IndexKey::Int(0))
9146 .to_vec();
9147 assert_eq!(entries_after_promote.len(), 1);
9148 assert!(entries_after_promote[0].is_hot());
9149
9150 for id in [2i64, 3] {
9157 assert_eq!(
9158 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
9159 .unwrap(),
9160 make_user_row(id, &alloc::format!("u-{id}"))
9161 );
9162 }
9163 }
9164}