1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138 Bytes,
146 TextArray,
155 IntArray,
159 BigIntArray,
162}
163
164impl fmt::Display for DataType {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 match self {
167 Self::SmallInt => f.write_str("SMALLINT"),
168 Self::Int => f.write_str("INT"),
169 Self::BigInt => f.write_str("BIGINT"),
170 Self::Float => f.write_str("FLOAT"),
171 Self::Text => f.write_str("TEXT"),
172 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
173 Self::Char(n) => write!(f, "CHAR({n})"),
174 Self::Bool => f.write_str("BOOL"),
175 Self::Vector { dim, encoding } => match encoding {
176 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
177 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
178 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
179 },
180 Self::Numeric { precision, scale } => {
181 if *scale == 0 {
182 write!(f, "NUMERIC({precision})")
183 } else {
184 write!(f, "NUMERIC({precision}, {scale})")
185 }
186 }
187 Self::Date => f.write_str("DATE"),
188 Self::Timestamp => f.write_str("TIMESTAMP"),
189 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
190 Self::Interval => f.write_str("INTERVAL"),
191 Self::Json => f.write_str("JSON"),
192 Self::Jsonb => f.write_str("JSONB"),
193 Self::Bytes => f.write_str("BYTEA"),
194 Self::TextArray => f.write_str("TEXT[]"),
195 Self::IntArray => f.write_str("INT[]"),
196 Self::BigIntArray => f.write_str("BIGINT[]"),
197 }
198 }
199}
200
201#[derive(Debug, Clone, PartialEq)]
205#[non_exhaustive]
206pub enum Value {
207 SmallInt(i16),
208 Int(i32),
209 BigInt(i64),
210 Float(f64),
211 Text(String),
212 Bool(bool),
213 Vector(Vec<f32>),
214 Sq8Vector(crate::quantize::Sq8Vector),
221 HalfVector(crate::halfvec::HalfVector),
227 Numeric {
231 scaled: i128,
232 scale: u8,
233 },
234 Date(i32),
236 Timestamp(i64),
238 Interval {
241 months: i32,
242 micros: i64,
243 },
244 Json(String),
248 Bytes(Vec<u8>),
254 TextArray(Vec<Option<String>>),
260 IntArray(Vec<Option<i32>>),
264 BigIntArray(Vec<Option<i64>>),
267 Null,
268}
269
270impl Value {
271 pub fn data_type(&self) -> Option<DataType> {
273 match self {
274 Self::SmallInt(_) => Some(DataType::SmallInt),
275 Self::Int(_) => Some(DataType::Int),
276 Self::BigInt(_) => Some(DataType::BigInt),
277 Self::Float(_) => Some(DataType::Float),
278 Self::Text(_) => Some(DataType::Text),
281 Self::Bool(_) => Some(DataType::Bool),
282 Self::Vector(v) => Some(DataType::Vector {
283 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
284 encoding: VecEncoding::F32,
285 }),
286 Self::Sq8Vector(q) => Some(DataType::Vector {
287 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
288 encoding: VecEncoding::Sq8,
289 }),
290 Self::HalfVector(h) => Some(DataType::Vector {
291 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
292 encoding: VecEncoding::F16,
293 }),
294 Self::Numeric { scale, .. } => Some(DataType::Numeric {
299 precision: 0,
300 scale: *scale,
301 }),
302 Self::Date(_) => Some(DataType::Date),
303 Self::Timestamp(_) => Some(DataType::Timestamp),
304 Self::Interval { .. } => Some(DataType::Interval),
305 Self::Json(_) => Some(DataType::Json),
306 Self::Bytes(_) => Some(DataType::Bytes),
307 Self::TextArray(_) => Some(DataType::TextArray),
308 Self::IntArray(_) => Some(DataType::IntArray),
309 Self::BigIntArray(_) => Some(DataType::BigIntArray),
310 Self::Null => None,
311 }
312 }
313
314 pub const fn is_null(&self) -> bool {
315 matches!(self, Self::Null)
316 }
317}
318
319#[derive(Debug, Clone, PartialEq)]
322pub struct Row {
323 pub values: Vec<Value>,
324}
325
326impl Row {
327 pub const fn new(values: Vec<Value>) -> Self {
328 Self { values }
329 }
330
331 pub fn len(&self) -> usize {
332 self.values.len()
333 }
334
335 pub fn is_empty(&self) -> bool {
336 self.values.is_empty()
337 }
338}
339
340#[derive(Debug, Clone, PartialEq)]
341pub struct ColumnSchema {
342 pub name: String,
343 pub ty: DataType,
344 pub nullable: bool,
345 pub default: Option<Value>,
350 pub runtime_default: Option<String>,
358 pub auto_increment: bool,
362}
363
364#[derive(Debug, Clone, PartialEq)]
365pub struct TableSchema {
366 pub name: String,
367 pub columns: Vec<ColumnSchema>,
368 pub hot_tier_bytes: Option<u64>,
374 pub foreign_keys: Vec<ForeignKeyConstraint>,
381 pub uniqueness_constraints: Vec<UniquenessConstraint>,
388}
389
390#[derive(Debug, Clone, PartialEq, Eq)]
395pub struct UniquenessConstraint {
396 pub is_primary_key: bool,
401 pub columns: Vec<usize>,
405}
406
407#[derive(Debug, Clone, PartialEq, Eq)]
412pub struct ForeignKeyConstraint {
413 pub name: Option<String>,
417 pub local_columns: Vec<usize>,
420 pub parent_table: String,
422 pub parent_columns: Vec<usize>,
427 pub on_delete: FkAction,
429 pub on_update: FkAction,
432}
433
434#[derive(Debug, Clone, Copy, PartialEq, Eq)]
436pub enum FkAction {
437 Restrict,
438 Cascade,
439 SetNull,
440 SetDefault,
441 NoAction,
442}
443
444impl FkAction {
445 pub const fn tag(self) -> u8 {
447 match self {
448 Self::Restrict => 0,
449 Self::Cascade => 1,
450 Self::SetNull => 2,
451 Self::SetDefault => 3,
452 Self::NoAction => 4,
453 }
454 }
455 pub const fn from_tag(b: u8) -> Option<Self> {
456 Some(match b {
457 0 => Self::Restrict,
458 1 => Self::Cascade,
459 2 => Self::SetNull,
460 3 => Self::SetDefault,
461 4 => Self::NoAction,
462 _ => return None,
463 })
464 }
465}
466
467impl TableSchema {
468 pub fn column_position(&self, name: &str) -> Option<usize> {
469 self.columns.iter().position(|c| c.name == name)
470 }
471}
472
473#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
478pub enum IndexKey {
479 Int(i64),
480 Text(String),
481 Bool(bool),
482}
483
484impl IndexKey {
485 pub fn from_value(v: &Value) -> Option<Self> {
486 match v {
487 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
488 Value::Int(n) => Some(Self::Int(i64::from(*n))),
489 Value::BigInt(n) => Some(Self::Int(*n)),
490 Value::Text(s) => Some(Self::Text(s.clone())),
491 Value::Bool(b) => Some(Self::Bool(*b)),
492 Value::Date(d) => Some(Self::Int(i64::from(*d))),
495 Value::Timestamp(t) => Some(Self::Int(*t)),
496 Value::Null
501 | Value::Float(_)
502 | Value::Vector(_)
503 | Value::Sq8Vector(_)
504 | Value::HalfVector(_)
505 | Value::Numeric { .. }
506 | Value::Interval { .. }
507 | Value::Json(_)
508 | Value::Bytes(_)
509 | Value::TextArray(_)
510 | Value::IntArray(_)
511 | Value::BigIntArray(_) => None,
512 }
513 }
514}
515
516#[derive(Debug, Clone)]
521pub struct Index {
522 pub name: String,
523 pub column_position: usize,
524 pub kind: IndexKind,
525 pub included_columns: Vec<usize>,
535 pub partial_predicate: Option<String>,
542 pub expression: Option<String>,
547 pub is_unique: bool,
554 pub extra_column_positions: Vec<usize>,
563}
564
565pub const NSW_DEFAULT_M: usize = 16;
568
569#[derive(Debug, Clone)]
577pub struct FreezeReport {
578 pub segment_id: u32,
581 pub frozen_rows: usize,
584 pub bytes_freed: u64,
588 pub segment_bytes: Vec<u8>,
593}
594
595#[derive(Debug, Clone)]
604pub struct FreezeSlice {
605 pub row_range: core::ops::Range<usize>,
610 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
616}
617
618#[derive(Debug, Clone)]
634pub struct CompactReport {
635 pub sources: Vec<u32>,
637 pub merged_segment_id: Option<u32>,
639 pub merged_segment_bytes: Vec<u8>,
641 pub merged_rows: usize,
643 pub deleted_rows_pruned: usize,
648 pub bytes_reclaimed_estimate: u64,
652}
653
654#[derive(Debug, Clone)]
655pub enum IndexKind {
656 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
673 Nsw(NswGraph),
675 Brin {
682 column_type: DataType,
686 },
687}
688
689#[derive(Debug, Clone)]
698pub struct NswGraph {
699 pub m: usize,
701 pub m_max_0: usize,
704 pub entry: Option<usize>,
707 pub entry_level: u8,
709 pub levels: PersistentVec<u8>,
716 pub layers: Vec<PersistentVec<Vec<u32>>>,
732}
733
734impl NswGraph {
735 fn new(m: usize) -> Self {
736 Self {
737 m,
738 m_max_0: m.saturating_mul(2),
739 entry: None,
740 entry_level: 0,
741 levels: PersistentVec::new(),
742 layers: alloc::vec![PersistentVec::new()],
743 }
744 }
745
746 pub const fn cap_for_layer(&self, layer: u8) -> usize {
748 if layer == 0 { self.m_max_0 } else { self.m }
749 }
750}
751
752#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
759 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
762 x ^= x >> 30;
763 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
764 x ^= x >> 27;
765 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
766 x ^= x >> 31;
767 let mut level: u8 = 0;
772 while x & 0xF == 0 && level < MAX_LEVEL {
773 level += 1;
774 x >>= 4;
775 }
776 level
777}
778
779impl Index {
780 fn new_btree(name: String, column_position: usize) -> Self {
781 Self {
782 name,
783 column_position,
784 kind: IndexKind::BTree(PersistentBTreeMap::new()),
785 included_columns: Vec::new(),
786 partial_predicate: None,
787 expression: None,
788 is_unique: false,
789 extra_column_positions: Vec::new(),
790 }
791 }
792
793 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
794 Self {
795 name,
796 column_position,
797 kind: IndexKind::Nsw(NswGraph::new(m)),
798 included_columns: Vec::new(),
799 partial_predicate: None,
800 expression: None,
801 is_unique: false,
802 extra_column_positions: Vec::new(),
803 }
804 }
805
806 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
810 Self {
811 name,
812 column_position,
813 kind: IndexKind::Brin { column_type },
814 included_columns: Vec::new(),
815 partial_predicate: None,
816 expression: None,
817 is_unique: false,
818 extra_column_positions: Vec::new(),
819 }
820 }
821
822 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
831 match &self.kind {
832 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
833 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
835 }
836 }
837
838 pub const fn nsw(&self) -> Option<&NswGraph> {
841 match &self.kind {
842 IndexKind::Nsw(g) => Some(g),
843 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
844 }
845 }
846
847 pub const fn is_brin(&self) -> bool {
852 matches!(self.kind, IndexKind::Brin { .. })
853 }
854}
855
856#[derive(Debug, Clone)]
872pub struct Table {
873 schema: TableSchema,
874 rows: PersistentVec<Row>,
875 indices: Vec<Index>,
876 hot_bytes: u64,
877 cold_row_count: u64,
891 cold_row_count_stale: bool,
896}
897
898impl Table {
899 pub fn new(schema: TableSchema) -> Self {
900 Self {
901 schema,
902 rows: PersistentVec::new(),
903 indices: Vec::new(),
904 hot_bytes: 0,
905 cold_row_count: 0,
906 cold_row_count_stale: false,
907 }
908 }
909
910 #[must_use]
914 pub const fn hot_bytes(&self) -> u64 {
915 self.hot_bytes
916 }
917
918 #[must_use]
921 pub const fn cold_row_count(&self) -> u64 {
922 self.cold_row_count
923 }
924
925 pub fn set_cold_row_count(&mut self, n: u64) {
928 self.cold_row_count = n;
929 self.cold_row_count_stale = false;
930 }
931
932 pub fn mark_cold_row_count_stale(&mut self) {
937 self.cold_row_count_stale = true;
938 }
939
940 #[must_use]
944 pub const fn cold_row_count_stale(&self) -> bool {
945 self.cold_row_count_stale
946 }
947
948 #[must_use]
959 pub fn count_cold_locators(&self) -> u64 {
960 let mut best: u64 = 0;
961 for idx in &self.indices {
962 if let IndexKind::BTree(map) = &idx.kind {
963 let n: u64 = map
964 .iter()
965 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
966 .sum();
967 if n > best {
968 best = n;
969 }
970 }
971 }
972 best
973 }
974
975 pub const fn schema(&self) -> &TableSchema {
976 &self.schema
977 }
978
979 pub const fn schema_mut(&mut self) -> &mut TableSchema {
983 &mut self.schema
984 }
985
986 pub const fn rows(&self) -> &PersistentVec<Row> {
990 &self.rows
991 }
992
993 pub const fn row_count(&self) -> usize {
994 self.rows.len()
995 }
996
997 pub fn indices_mut(&mut self) -> &mut [Index] {
1002 &mut self.indices
1003 }
1004
1005 pub fn indices(&self) -> &[Index] {
1006 &self.indices
1007 }
1008
1009 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
1015 let ty = self.schema.columns.get(col_pos)?.ty;
1016 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
1017 return None;
1018 }
1019 let mut max: Option<i64> = None;
1020 for row in &self.rows {
1021 match row.values.get(col_pos) {
1022 Some(Value::SmallInt(n)) => {
1023 let v = i64::from(*n);
1024 max = Some(max.map_or(v, |m| m.max(v)));
1025 }
1026 Some(Value::Int(n)) => {
1027 let v = i64::from(*n);
1028 max = Some(max.map_or(v, |m| m.max(v)));
1029 }
1030 Some(Value::BigInt(n)) => {
1031 max = Some(max.map_or(*n, |m| m.max(*n)));
1032 }
1033 _ => {}
1034 }
1035 }
1036 Some(max.map_or(1, |m| m + 1))
1037 }
1038
1039 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1043 self.indices
1050 .iter()
1051 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1052 .or_else(|| {
1053 self.indices.iter().find(|i| {
1054 i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_))
1055 })
1056 })
1057 }
1058
1059 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1063 if row.len() != self.schema.columns.len() {
1064 return Err(StorageError::ArityMismatch {
1065 expected: self.schema.columns.len(),
1066 actual: row.len(),
1067 });
1068 }
1069 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1070 if val.is_null() {
1071 if !col.nullable {
1072 return Err(StorageError::NullInNotNull {
1073 column: col.name.clone(),
1074 });
1075 }
1076 continue;
1077 }
1078 let actual = val.data_type().expect("non-null");
1079 let compatible = actual == col.ty
1093 || matches!(
1094 (actual, col.ty),
1095 (
1096 DataType::Text,
1097 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1098 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1099 | (DataType::Json, DataType::Jsonb)
1100 | (DataType::Jsonb, DataType::Json)
1101 | (DataType::Timestamp, DataType::Timestamptz)
1102 | (DataType::Timestamptz, DataType::Timestamp)
1103 )
1104 || matches!(
1105 (actual, col.ty),
1106 (
1107 DataType::Numeric { scale: a, .. },
1108 DataType::Numeric { scale: b, .. },
1109 ) if a == b
1110 );
1111 if !compatible {
1112 return Err(StorageError::TypeMismatch {
1113 column: col.name.clone(),
1114 expected: col.ty,
1115 actual,
1116 position: i,
1117 });
1118 }
1119 }
1120 let new_row_idx = self.rows.len();
1121 for idx in &mut self.indices {
1125 if let IndexKind::BTree(map) = &mut idx.kind
1126 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1127 {
1128 let mut entries = map.get(&key).cloned().unwrap_or_default();
1134 entries.push(RowLocator::Hot(new_row_idx));
1135 map.insert_mut(key, entries);
1136 }
1137 }
1138 self.hot_bytes = self
1141 .hot_bytes
1142 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1143 self.rows.push_mut(row);
1148 let new_row_idx = self.rows.len() - 1;
1151 let nsw_targets: Vec<usize> = self
1152 .indices
1153 .iter()
1154 .enumerate()
1155 .filter_map(|(i, idx)| {
1156 if matches!(idx.kind, IndexKind::Nsw(_)) {
1157 Some(i)
1158 } else {
1159 None
1160 }
1161 })
1162 .collect();
1163 for idx_pos in nsw_targets {
1164 nsw_insert_at(self, idx_pos, new_row_idx);
1165 }
1166 Ok(())
1167 }
1168
1169 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1173 if self.indices.iter().any(|i| i.name == name) {
1174 return Err(StorageError::DuplicateIndex { name });
1175 }
1176 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1177 StorageError::ColumnNotFound {
1178 column: column_name.into(),
1179 }
1180 })?;
1181 let mut idx = Index::new_btree(name, column_position);
1182 if let IndexKind::BTree(map) = &mut idx.kind {
1183 for (i, row) in self.rows.iter().enumerate() {
1184 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1185 let mut entries = map.get(&key).cloned().unwrap_or_default();
1186 entries.push(RowLocator::Hot(i));
1187 map.insert_mut(key, entries);
1188 }
1189 }
1190 }
1191 self.indices.push(idx);
1192 Ok(())
1193 }
1194
1195 pub fn add_nsw_index(
1200 &mut self,
1201 name: String,
1202 column_name: &str,
1203 m: usize,
1204 ) -> Result<(), StorageError> {
1205 self.add_nsw_index_inner(name, column_name, m, None)
1206 }
1207
1208 pub fn rebuild_nsw_index(
1220 &mut self,
1221 name: &str,
1222 new_encoding: Option<VecEncoding>,
1223 ) -> Result<(), StorageError> {
1224 let idx_pos = self
1225 .indices
1226 .iter()
1227 .position(|i| i.name == name)
1228 .ok_or_else(|| StorageError::IndexNotFound {
1229 name: String::from(name),
1230 })?;
1231 let col_pos = self.indices[idx_pos].column_position;
1232 let m = match &self.indices[idx_pos].kind {
1233 IndexKind::Nsw(g) => g.m,
1234 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1235 return Err(StorageError::Unsupported(format!(
1236 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1237 )));
1238 }
1239 };
1240 let col_name = self.schema.columns[col_pos].name.clone();
1241 if let Some(target) = new_encoding {
1244 let current = match self.schema.columns[col_pos].ty {
1245 DataType::Vector { encoding, .. } => encoding,
1246 ref other => {
1247 return Err(StorageError::Unsupported(format!(
1248 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1249 )));
1250 }
1251 };
1252 if target != current {
1253 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1254 unreachable!("checked above")
1255 };
1256 let n = self.rows.len();
1257 for i in 0..n {
1258 let row = self
1259 .rows
1260 .get_mut(i)
1261 .expect("row index in bounds (we iterated up to len())");
1262 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1263 let recoded = recode_vector_cell(cell, target)?;
1264 row.values[col_pos] = recoded;
1265 }
1266 self.schema.columns[col_pos].ty = DataType::Vector {
1267 dim,
1268 encoding: target,
1269 };
1270 }
1271 }
1272 self.indices.remove(idx_pos);
1274 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1275 Ok(())
1276 }
1277
1278 pub fn restore_nsw_index(
1283 &mut self,
1284 name: String,
1285 column_name: &str,
1286 graph: NswGraph,
1287 ) -> Result<(), StorageError> {
1288 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1289 }
1290
1291 pub fn restore_btree_index(
1298 &mut self,
1299 name: String,
1300 column_name: &str,
1301 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1302 ) -> Result<(), StorageError> {
1303 if self.indices.iter().any(|i| i.name == name) {
1304 return Err(StorageError::DuplicateIndex { name });
1305 }
1306 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1307 StorageError::ColumnNotFound {
1308 column: column_name.into(),
1309 }
1310 })?;
1311 self.indices.push(Index {
1312 name,
1313 column_position,
1314 kind: IndexKind::BTree(map),
1315 included_columns: Vec::new(),
1316 partial_predicate: None,
1317 expression: None,
1318 is_unique: false,
1319 extra_column_positions: Vec::new(),
1320 });
1321 Ok(())
1322 }
1323
1324 pub fn restore_brin_index(
1329 &mut self,
1330 name: String,
1331 column_name: &str,
1332 column_type: DataType,
1333 ) -> Result<(), StorageError> {
1334 if self.indices.iter().any(|i| i.name == name) {
1335 return Err(StorageError::DuplicateIndex { name });
1336 }
1337 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1338 StorageError::ColumnNotFound {
1339 column: column_name.into(),
1340 }
1341 })?;
1342 self.indices
1343 .push(Index::new_brin(name, column_position, column_type));
1344 Ok(())
1345 }
1346
1347 pub fn add_brin_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1351 if self.indices.iter().any(|i| i.name == name) {
1352 return Err(StorageError::DuplicateIndex { name });
1353 }
1354 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1355 StorageError::ColumnNotFound {
1356 column: column_name.into(),
1357 }
1358 })?;
1359 let column_type = self.schema.columns[column_position].ty;
1360 self.indices
1361 .push(Index::new_brin(name, column_position, column_type));
1362 Ok(())
1363 }
1364
1365 pub fn register_cold_locators<I>(
1382 &mut self,
1383 index_name: &str,
1384 locators: I,
1385 ) -> Result<usize, StorageError>
1386 where
1387 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1388 {
1389 let idx = self
1390 .indices
1391 .iter_mut()
1392 .find(|i| i.name == index_name)
1393 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1394 let map = match &mut idx.kind {
1395 IndexKind::BTree(map) => map,
1396 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1397 return Err(StorageError::Corrupt(format!(
1398 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1399 )));
1400 }
1401 };
1402 let mut count = 0usize;
1403 for (key, locator) in locators {
1404 let mut entries = map.get(&key).cloned().unwrap_or_default();
1405 entries.push(locator);
1406 map.insert_mut(key, entries);
1407 count += 1;
1408 }
1409 Ok(count)
1410 }
1411
1412 pub fn remove_cold_locators_for_key(
1422 &mut self,
1423 index_name: &str,
1424 key: &IndexKey,
1425 ) -> Result<usize, StorageError> {
1426 let idx = self
1427 .indices
1428 .iter_mut()
1429 .find(|i| i.name == index_name)
1430 .ok_or_else(|| {
1431 StorageError::Corrupt(format!(
1432 "remove_cold_locators_for_key: index {index_name:?} not found"
1433 ))
1434 })?;
1435 let map = match &mut idx.kind {
1436 IndexKind::BTree(map) => map,
1437 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1438 return Err(StorageError::Corrupt(format!(
1439 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1440 cold locators apply only to BTree indices"
1441 )));
1442 }
1443 };
1444 let Some(entries) = map.get(key) else {
1445 return Ok(0);
1446 };
1447 let mut kept: Vec<RowLocator> =
1448 entries.iter().copied().filter(RowLocator::is_hot).collect();
1449 let removed = entries.len() - kept.len();
1450 if removed == 0 {
1451 return Ok(0);
1452 }
1453 kept.shrink_to_fit();
1454 map.insert_mut(key.clone(), kept);
1462 Ok(removed)
1463 }
1464
1465 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1471 if positions.is_empty() {
1472 return 0;
1473 }
1474 let mut to_remove = alloc::vec![false; self.rows.len()];
1478 let mut removed = 0;
1479 for &p in positions {
1480 if p < to_remove.len() && !to_remove[p] {
1481 to_remove[p] = true;
1482 removed += 1;
1483 }
1484 }
1485 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1486 let mut removed_bytes: u64 = 0;
1487 for (i, row) in self.rows.iter().enumerate() {
1488 if to_remove[i] {
1489 removed_bytes =
1490 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1491 } else {
1492 new_rows.push_mut(row.clone());
1493 }
1494 }
1495 self.rows = new_rows;
1496 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1497 self.rebuild_indices();
1498 removed
1499 }
1500
1501 pub fn update_row(
1507 &mut self,
1508 position: usize,
1509 new_values: Vec<Value>,
1510 ) -> Result<(), StorageError> {
1511 if position >= self.rows.len() {
1512 return Err(StorageError::Corrupt(alloc::format!(
1513 "update_row: position {position} out of bounds (rows={})",
1514 self.rows.len()
1515 )));
1516 }
1517 if new_values.len() != self.schema.columns.len() {
1518 return Err(StorageError::ArityMismatch {
1519 expected: self.schema.columns.len(),
1520 actual: new_values.len(),
1521 });
1522 }
1523 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1527 if val.is_null() {
1528 if !col.nullable {
1529 return Err(StorageError::NullInNotNull {
1530 column: col.name.clone(),
1531 });
1532 }
1533 continue;
1534 }
1535 let actual = val.data_type().expect("non-null");
1536 let compatible = actual == col.ty
1537 || matches!(
1538 (actual, col.ty),
1539 (
1540 DataType::Text,
1541 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1542 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1543 | (DataType::Json, DataType::Jsonb)
1544 | (DataType::Jsonb, DataType::Json)
1545 | (DataType::Timestamp, DataType::Timestamptz)
1546 | (DataType::Timestamptz, DataType::Timestamp)
1547 )
1548 || matches!(
1549 (actual, col.ty),
1550 (
1551 DataType::Numeric { scale: a, .. },
1552 DataType::Numeric { scale: b, .. },
1553 ) if a == b
1554 );
1555 if !compatible {
1556 return Err(StorageError::TypeMismatch {
1557 column: col.name.clone(),
1558 expected: col.ty,
1559 actual,
1560 position: i,
1561 });
1562 }
1563 }
1564 let old_row = self
1565 .rows
1566 .get(position)
1567 .expect("position bounds-checked above");
1568 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1569 let new_row = Row::new(new_values);
1570 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1571 self.rows = self
1572 .rows
1573 .set(position, new_row)
1574 .expect("position bounds-checked above");
1575 self.hot_bytes = self
1576 .hot_bytes
1577 .saturating_sub(old_bytes)
1578 .saturating_add(new_bytes);
1579 self.rebuild_indices();
1580 Ok(())
1581 }
1582
1583 fn rebuild_indices(&mut self) {
1590 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1599 .indices
1600 .iter()
1601 .filter_map(|idx| match &idx.kind {
1602 IndexKind::BTree(map) => {
1603 let cold: Vec<(IndexKey, RowLocator)> = map
1604 .iter()
1605 .flat_map(|(k, locs)| {
1606 locs.iter()
1607 .filter(|l| l.is_cold())
1608 .copied()
1609 .map(move |l| (k.clone(), l))
1610 })
1611 .collect();
1612 if cold.is_empty() {
1613 None
1614 } else {
1615 Some((idx.name.clone(), cold))
1616 }
1617 }
1618 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1620 })
1621 .collect();
1622
1623 #[derive(Clone)]
1628 enum RebuildKind {
1629 BTree,
1630 Nsw(usize),
1631 Brin(DataType),
1632 }
1633 let descriptors: Vec<(String, usize, RebuildKind)> = self
1634 .indices
1635 .iter()
1636 .map(|idx| {
1637 let kind = match &idx.kind {
1638 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1639 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1640 IndexKind::BTree(_) => RebuildKind::BTree,
1641 };
1642 (idx.name.clone(), idx.column_position, kind)
1643 })
1644 .collect();
1645 self.indices.clear();
1646 for (name, column_position, rebuild_kind) in descriptors {
1647 match rebuild_kind {
1648 RebuildKind::Nsw(m) => {
1649 let idx = Index::new_nsw(name, column_position, m);
1650 self.indices.push(idx);
1651 let idx_pos = self.indices.len() - 1;
1652 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1653 for row_idx in row_indices {
1654 nsw_insert_at(self, idx_pos, row_idx);
1655 }
1656 }
1657 RebuildKind::Brin(column_type) => {
1658 self.indices
1661 .push(Index::new_brin(name, column_position, column_type));
1662 }
1663 RebuildKind::BTree => {
1664 let mut idx = Index::new_btree(name, column_position);
1665 if let IndexKind::BTree(map) = &mut idx.kind {
1666 for (i, row) in self.rows.iter().enumerate() {
1667 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1668 let mut entries = map.get(&key).cloned().unwrap_or_default();
1669 entries.push(RowLocator::Hot(i));
1670 map.insert_mut(key, entries);
1671 }
1672 }
1673 }
1674 self.indices.push(idx);
1675 }
1676 }
1677 }
1678
1679 for (idx_name, locators) in preserved_cold {
1684 let _ = self.register_cold_locators(&idx_name, locators);
1688 }
1689 }
1690
1691 fn add_nsw_index_inner(
1692 &mut self,
1693 name: String,
1694 column_name: &str,
1695 m: usize,
1696 restore: Option<NswGraph>,
1697 ) -> Result<(), StorageError> {
1698 if self.indices.iter().any(|i| i.name == name) {
1699 return Err(StorageError::DuplicateIndex { name });
1700 }
1701 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1702 StorageError::ColumnNotFound {
1703 column: column_name.into(),
1704 }
1705 })?;
1706 if !matches!(
1707 self.schema.columns[column_position].ty,
1708 DataType::Vector { .. }
1709 ) {
1710 return Err(StorageError::TypeMismatch {
1711 column: column_name.into(),
1712 expected: DataType::Vector {
1713 dim: 0,
1714 encoding: VecEncoding::F32,
1715 },
1716 actual: self.schema.columns[column_position].ty,
1717 position: column_position,
1718 });
1719 }
1720 if let Some(graph) = restore {
1721 self.indices.push(Index {
1722 name,
1723 column_position,
1724 kind: IndexKind::Nsw(graph),
1725 included_columns: Vec::new(),
1726 partial_predicate: None,
1727 expression: None,
1728 is_unique: false,
1729 extra_column_positions: Vec::new(),
1730 });
1731 return Ok(());
1732 }
1733 let idx = Index::new_nsw(name, column_position, m);
1734 self.indices.push(idx);
1735 let idx_pos = self.indices.len() - 1;
1736 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1739 for row_idx in row_indices {
1740 nsw_insert_at(self, idx_pos, row_idx);
1741 }
1742 Ok(())
1743 }
1744}
1745
1746fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1753 if matches!(cell, Value::Null) {
1754 return Ok(cell);
1755 }
1756 let as_f32: Vec<f32> = match &cell {
1758 Value::Vector(v) => v.clone(),
1759 Value::Sq8Vector(q) => quantize::dequantize(q),
1760 Value::HalfVector(h) => h.to_f32_vec(),
1761 other => {
1762 return Err(StorageError::Unsupported(format!(
1763 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1764 other.data_type()
1765 )));
1766 }
1767 };
1768 Ok(match target {
1773 VecEncoding::F32 => Value::Vector(as_f32),
1774 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1775 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1776 })
1777}
1778
1779fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1786 let col_pos = table.indices[idx_pos].column_position;
1787 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1788 Value::Vector(v) => Some(v.len()),
1789 Value::Sq8Vector(q) => Some(q.bytes.len()),
1790 Value::HalfVector(h) => Some(h.dim()),
1791 _ => None,
1792 };
1793 let Some(dim) = cell_dim else {
1794 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1797 return;
1798 };
1799 if dim == 0 {
1800 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1801 return;
1802 }
1803 let level = nsw_assign_level(new_row_idx);
1804 ensure_node_slot(table, idx_pos, new_row_idx, level);
1805 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1806 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1807 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1808 unreachable!("nsw_insert_at on a non-NSW index")
1809 }
1810 };
1811 if entry.is_none() {
1813 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1814 g.entry = Some(new_row_idx);
1815 g.entry_level = level;
1816 *g.levels
1817 .get_mut(new_row_idx)
1818 .expect("levels slot padded by ensure_node_slot") = level;
1819 }
1820 return;
1821 }
1822 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1824 *g.levels
1825 .get_mut(new_row_idx)
1826 .expect("levels slot padded by ensure_node_slot") = level;
1827 }
1828 let query = match &table.rows[new_row_idx].values[col_pos] {
1829 Value::Vector(v) => v.clone(),
1830 Value::Sq8Vector(q) => quantize::dequantize(q),
1836 Value::HalfVector(h) => h.to_f32_vec(),
1839 _ => return,
1840 };
1841 let mut current = entry.expect("entry was Some above");
1844 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1845 if entry_level > level {
1846 for layer in (level + 1..=entry_level).rev() {
1847 (current, current_d) =
1848 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1849 }
1850 }
1851 let top = level.min(entry_level);
1855 let ef = (m * 2).max(8);
1856 for layer in (0..=top).rev() {
1857 let cap = if layer == 0 { m * 2 } else { m };
1858 let mut candidates = layer_beam_search(
1859 table,
1860 idx_pos,
1861 layer,
1862 current,
1863 current_d,
1864 &query,
1865 ef,
1866 NswMetric::L2,
1867 );
1868 candidates.retain(|&(_, n)| n != new_row_idx);
1869 if let Some(&(d, n)) = candidates.first() {
1872 current = n;
1873 current_d = d;
1874 }
1875 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1876 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1877 }
1878 if level > entry_level
1881 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1882 {
1883 g.entry = Some(new_row_idx);
1884 g.entry_level = level;
1885 }
1886}
1887
1888fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1892 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1893 unreachable!("ensure_node_slot on a BTree index");
1894 };
1895 while g.layers.len() <= level as usize {
1896 g.layers.push(PersistentVec::new());
1897 }
1898 while g.levels.len() <= new_row_idx {
1899 g.levels.push_mut(0);
1900 }
1901 for layer_vec in &mut g.layers {
1902 while layer_vec.len() <= new_row_idx {
1903 layer_vec.push_mut(Vec::new());
1904 }
1905 }
1906}
1907
1908fn greedy_layer_walk(
1914 table: &Table,
1915 idx_pos: usize,
1916 layer: u8,
1917 mut current: usize,
1918 mut current_d: f32,
1919 query: &[f32],
1920) -> (usize, f32) {
1921 let g = match &table.indices[idx_pos].kind {
1922 IndexKind::Nsw(g) => g,
1923 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1924 };
1925 let col_pos = table.indices[idx_pos].column_position;
1926 loop {
1927 let neighbours: &[u32] = g
1928 .layers
1929 .get(layer as usize)
1930 .and_then(|layer_v| layer_v.get(current))
1931 .map_or(&[][..], Vec::as_slice);
1932 let mut best = current;
1933 let mut best_d = current_d;
1934 for &n in neighbours {
1935 let n = n as usize;
1936 let d = vec_l2_sq(table, col_pos, n, query);
1937 if d < best_d {
1938 best = n;
1939 best_d = d;
1940 }
1941 }
1942 if best == current {
1943 return (current, current_d);
1944 }
1945 current = best;
1946 current_d = best_d;
1947 }
1948}
1949
1950#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1963 table: &Table,
1964 idx_pos: usize,
1965 layer: u8,
1966 entry_node: usize,
1967 entry_d: f32,
1968 query: &[f32],
1969 ef: usize,
1970 metric: NswMetric,
1971) -> Vec<(f32, usize)> {
1972 let g = match &table.indices[idx_pos].kind {
1973 IndexKind::Nsw(g) => g,
1974 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1975 };
1976 let col_pos = table.indices[idx_pos].column_position;
1977 let d0 = if matches!(metric, NswMetric::L2) {
1978 entry_d
1979 } else {
1980 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1981 };
1982 let row_count = table.rows.len();
1983 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1984 if entry_node < row_count {
1985 visited[entry_node] = true;
1986 }
1987 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1990 alloc::collections::BinaryHeap::with_capacity(ef);
1991 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1992 alloc::collections::BinaryHeap::with_capacity(ef);
1993 candidates.push(NodeClosest {
1994 dist: d0,
1995 node: entry_node,
1996 });
1997 results.push(NodeFurthest {
1998 dist: d0,
1999 node: entry_node,
2000 });
2001 while let Some(cur) = candidates.pop() {
2002 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2003 if cur.dist > worst && results.len() >= ef {
2004 break;
2005 }
2006 let neighbours: &[u32] = g
2007 .layers
2008 .get(layer as usize)
2009 .and_then(|layer_v| layer_v.get(cur.node))
2010 .map_or(&[][..], Vec::as_slice);
2011 for &n in neighbours {
2012 let n = n as usize;
2013 if n >= row_count || visited[n] {
2014 continue;
2015 }
2016 visited[n] = true;
2017 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2021 if !dn.is_finite() {
2022 continue;
2023 }
2024 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2025 if results.len() < ef || dn < worst {
2026 results.push(NodeFurthest { dist: dn, node: n });
2027 if results.len() > ef {
2028 results.pop();
2029 }
2030 candidates.push(NodeClosest { dist: dn, node: n });
2031 }
2032 }
2033 }
2034 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2037 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2038 out
2039}
2040
2041#[derive(Debug, Clone, Copy)]
2045struct NodeClosest {
2046 dist: f32,
2047 node: usize,
2048}
2049impl PartialEq for NodeClosest {
2050 fn eq(&self, other: &Self) -> bool {
2051 self.dist == other.dist && self.node == other.node
2052 }
2053}
2054impl Eq for NodeClosest {}
2055impl PartialOrd for NodeClosest {
2056 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2057 Some(self.cmp(other))
2058 }
2059}
2060impl Ord for NodeClosest {
2061 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2062 other
2064 .dist
2065 .partial_cmp(&self.dist)
2066 .unwrap_or(core::cmp::Ordering::Equal)
2067 }
2068}
2069
2070#[derive(Debug, Clone, Copy)]
2073struct NodeFurthest {
2074 dist: f32,
2075 node: usize,
2076}
2077impl PartialEq for NodeFurthest {
2078 fn eq(&self, other: &Self) -> bool {
2079 self.dist == other.dist && self.node == other.node
2080 }
2081}
2082impl Eq for NodeFurthest {}
2083impl PartialOrd for NodeFurthest {
2084 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2085 Some(self.cmp(other))
2086 }
2087}
2088impl Ord for NodeFurthest {
2089 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2090 self.dist
2091 .partial_cmp(&other.dist)
2092 .unwrap_or(core::cmp::Ordering::Equal)
2093 }
2094}
2095
2096fn select_neighbours_heuristic(
2105 candidates: &[(f32, usize)],
2106 m: usize,
2107 table: &Table,
2108 col_pos: usize,
2109) -> Vec<usize> {
2110 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2111 for &(d_q, e) in candidates {
2112 if chosen.len() >= m {
2113 break;
2114 }
2115 if !matches!(
2120 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2121 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2122 ) {
2123 continue;
2124 }
2125 let mut covered = false;
2126 for &r in &chosen {
2127 if cell_l2_sq(table, col_pos, e, r) < d_q {
2131 covered = true;
2132 break;
2133 }
2134 }
2135 if !covered {
2136 chosen.push(e);
2137 }
2138 }
2139 chosen
2140}
2141
2142fn connect_at_layer(
2146 table: &mut Table,
2147 idx_pos: usize,
2148 layer: u8,
2149 new_row_idx: usize,
2150 peers: &[usize],
2151) {
2152 let col_pos = table.indices[idx_pos].column_position;
2153 let cap = match &table.indices[idx_pos].kind {
2154 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2155 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2156 };
2157 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2162 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2163 let layer_v = &mut g.layers[layer as usize];
2164 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2165 *slot = peers
2166 .iter()
2167 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2168 .collect();
2169 }
2170 }
2171 for &peer in peers {
2172 if !matches!(
2176 &table.rows[peer].values[col_pos],
2177 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2178 ) {
2179 continue;
2180 }
2181 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2183 let layer_v = &mut g.layers[layer as usize];
2184 if let Some(slot) = layer_v.get_mut(peer)
2185 && !slot.contains(&new_row_u32)
2186 {
2187 slot.push(new_row_u32);
2188 }
2189 }
2190 let needs_trim = match &table.indices[idx_pos].kind {
2194 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2195 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2196 };
2197 if needs_trim {
2198 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2199 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2200 .iter()
2201 .map(|&n| n as usize)
2202 .collect(),
2203 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2204 };
2205 let mut tagged: Vec<(f32, usize)> = current_peers
2210 .iter()
2211 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2212 .collect();
2213 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2214 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2215 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2216 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2217 {
2218 *slot = kept
2219 .into_iter()
2220 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2221 .collect();
2222 }
2223 }
2224 }
2225}
2226
2227fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2234 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2235 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2236 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2237 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2238 }
2239 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2243 halfvec::half_l2_distance_sq_asymmetric(h, query)
2244 }
2245 _ => f32::INFINITY,
2246 }
2247}
2248
2249fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2256 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2257 return f32::INFINITY;
2258 };
2259 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2260 return f32::INFINITY;
2261 };
2262 match (cell_a, cell_b) {
2263 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2264 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2265 quantize::sq8_l2_distance_sq(a, b)
2266 }
2267 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2272 halfvec::half_l2_distance_sq(a, b)
2273 }
2274 _ => f32::INFINITY,
2275 }
2276}
2277
2278fn cell_to_query_metric_distance(
2283 table: &Table,
2284 col_pos: usize,
2285 row: usize,
2286 query: &[f32],
2287 metric: NswMetric,
2288) -> f32 {
2289 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2290 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2291 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2292 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2293 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2294 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2295 },
2296 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2299 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2300 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2301 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2302 },
2303 _ => f32::INFINITY,
2304 }
2305}
2306
2307#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2313pub enum NswMetric {
2314 L2,
2317 InnerProduct,
2320 Cosine,
2323}
2324
2325fn nsw_search(
2331 table: &Table,
2332 idx_pos: usize,
2333 query: &[f32],
2334 k: usize,
2335 ef: usize,
2336 metric: NswMetric,
2337) -> Vec<(f32, usize)> {
2338 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2339 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2340 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2341 };
2342 let Some(entry) = entry else {
2343 return Vec::new();
2344 };
2345 let col_pos = table.indices[idx_pos].column_position;
2346 let sq8 = matches!(
2353 table.schema.columns.get(col_pos).map(|c| c.ty),
2354 Some(DataType::Vector {
2355 encoding: VecEncoding::Sq8,
2356 ..
2357 })
2358 );
2359 let ef = if sq8 {
2360 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2361 } else {
2362 ef.max(k)
2363 };
2364 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2366 let mut current = entry;
2367 let mut current_d = entry_d;
2368 for layer in (1..=entry_level).rev() {
2369 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2370 }
2371 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2373 if sq8 {
2374 results = sq8_rerank(table, col_pos, &results, query, metric);
2375 }
2376 results.truncate(k);
2377 results
2378}
2379
2380fn sq8_rerank(
2387 table: &Table,
2388 col_pos: usize,
2389 candidates: &[(f32, usize)],
2390 query: &[f32],
2391 metric: NswMetric,
2392) -> Vec<(f32, usize)> {
2393 let mut out: Vec<(f32, usize)> = candidates
2394 .iter()
2395 .filter_map(|&(adc_d, row)| {
2396 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2397 let Value::Sq8Vector(q) = cell else {
2398 return Some((adc_d, row));
2402 };
2403 let deq = quantize::dequantize(q);
2404 if deq.len() != query.len() {
2405 return None;
2406 }
2407 Some((metric_distance(metric, &deq, query), row))
2408 })
2409 .collect();
2410 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2411 out
2412}
2413
2414const SQ8_RERANK_OVER_FETCH: usize = 3;
2418
2419fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2420 match metric {
2421 NswMetric::L2 => l2_distance_sq(a, b),
2422 NswMetric::InnerProduct => -inner_product_f32(a, b),
2423 NswMetric::Cosine => {
2424 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2425 if na == 0.0 || nb == 0.0 {
2426 return f32::INFINITY;
2427 }
2428 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2431 1.0 - dot / denom
2432 }
2433 }
2434}
2435
2436#[doc(hidden)]
2445#[inline]
2446pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2447 #[cfg(target_arch = "aarch64")]
2448 {
2449 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2450 return unsafe { inner_product_neon(a, b) };
2453 }
2454 }
2455 inner_product_scalar(a, b)
2456}
2457
2458fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2459 let mut dot: f32 = 0.0;
2460 for (x, y) in a.iter().zip(b.iter()) {
2461 dot += x * y;
2462 }
2463 dot
2464}
2465
2466#[cfg(target_arch = "aarch64")]
2467#[target_feature(enable = "neon")]
2468#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2470 use core::arch::aarch64::{
2471 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2472 };
2473 unsafe {
2474 let zero: float32x4_t = vdupq_n_f32(0.0);
2477 let mut acc0 = zero;
2478 let mut acc1 = zero;
2479 let n = a.len();
2480 let mut i = 0usize;
2481 while i + 8 <= n {
2482 let av0 = vld1q_f32(a.as_ptr().add(i));
2483 let bv0 = vld1q_f32(b.as_ptr().add(i));
2484 acc0 = vfmaq_f32(acc0, av0, bv0);
2485 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2486 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2487 acc1 = vfmaq_f32(acc1, av1, bv1);
2488 i += 8;
2489 }
2490 while i + 4 <= n {
2491 let av = vld1q_f32(a.as_ptr().add(i));
2492 let bv = vld1q_f32(b.as_ptr().add(i));
2493 acc0 = vfmaq_f32(acc0, av, bv);
2494 i += 4;
2495 }
2496 vaddvq_f32(vaddq_f32(acc0, acc1))
2497 }
2498}
2499
2500#[doc(hidden)]
2507#[inline]
2508pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2509 #[cfg(target_arch = "aarch64")]
2510 {
2511 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2512 return unsafe { cosine_dot_norms_neon(a, b) };
2514 }
2515 }
2516 cosine_dot_norms_scalar(a, b)
2517}
2518
2519fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2520 let mut dot: f32 = 0.0;
2521 let mut na: f32 = 0.0;
2522 let mut nb: f32 = 0.0;
2523 for (x, y) in a.iter().zip(b.iter()) {
2524 dot += x * y;
2525 na += x * x;
2526 nb += y * y;
2527 }
2528 (dot, na, nb)
2529}
2530
2531#[cfg(target_arch = "aarch64")]
2532#[target_feature(enable = "neon")]
2533#[allow(clippy::many_single_char_names, clippy::similar_names)]
2534unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2535 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2536 unsafe {
2537 let zero: float32x4_t = vdupq_n_f32(0.0);
2538 let mut acc_dot = zero;
2539 let mut acc_na = zero;
2540 let mut acc_nb = zero;
2541 let n = a.len();
2542 let mut i = 0usize;
2543 while i + 4 <= n {
2544 let av = vld1q_f32(a.as_ptr().add(i));
2545 let bv = vld1q_f32(b.as_ptr().add(i));
2546 acc_dot = vfmaq_f32(acc_dot, av, bv);
2547 acc_na = vfmaq_f32(acc_na, av, av);
2548 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2549 i += 4;
2550 }
2551 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2552 }
2553}
2554
2555fn sqrt_newton_f32(x: f32) -> f32 {
2556 if x <= 0.0 {
2557 return 0.0;
2558 }
2559 let mut g = x;
2560 for _ in 0..10 {
2561 g = 0.5 * (g + x / g);
2562 }
2563 g
2564}
2565
2566#[inline]
2574fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2575 #[cfg(target_arch = "aarch64")]
2576 {
2577 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2578 return unsafe { l2_distance_sq_neon(a, b) };
2582 }
2583 }
2584 l2_distance_sq_scalar(a, b)
2585}
2586
2587fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2588 let mut sum: f32 = 0.0;
2589 for (x, y) in a.iter().zip(b.iter()) {
2590 let d = *x - *y;
2591 sum += d * d;
2592 }
2593 sum
2594}
2595
2596#[cfg(target_arch = "aarch64")]
2597#[target_feature(enable = "neon")]
2598#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2600 use core::arch::aarch64::{
2601 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2602 };
2603 unsafe {
2604 let zero: float32x4_t = vdupq_n_f32(0.0);
2609 let mut acc0 = zero;
2610 let mut acc1 = zero;
2611 let n = a.len();
2612 let mut i = 0usize;
2613 while i + 8 <= n {
2616 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2617 acc0 = vfmaq_f32(acc0, d0, d0);
2618 let d1 = vsubq_f32(
2619 vld1q_f32(a.as_ptr().add(i + 4)),
2620 vld1q_f32(b.as_ptr().add(i + 4)),
2621 );
2622 acc1 = vfmaq_f32(acc1, d1, d1);
2623 i += 8;
2624 }
2625 while i + 4 <= n {
2626 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2627 acc0 = vfmaq_f32(acc0, d, d);
2628 i += 4;
2629 }
2630 vaddvq_f32(vaddq_f32(acc0, acc1))
2631 }
2632}
2633
2634pub fn nsw_query(
2637 table: &Table,
2638 idx_name: &str,
2639 query: &[f32],
2640 k: usize,
2641 metric: NswMetric,
2642) -> Vec<usize> {
2643 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2644 return Vec::new();
2645 };
2646 let ef = (k * 2).max(NSW_DEFAULT_M);
2647 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2648 hits.truncate(k);
2649 hits.into_iter().map(|(_, idx)| idx).collect()
2650}
2651
2652pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2656 table
2657 .indices
2658 .iter()
2659 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2660}
2661
2662#[derive(Debug, Clone, Default)]
2674pub struct Catalog {
2675 tables: Vec<Table>,
2676 by_name: BTreeMap<String, usize>,
2679 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2701}
2702
2703impl Catalog {
2704 pub const fn new() -> Self {
2705 Self {
2706 tables: Vec::new(),
2707 by_name: BTreeMap::new(),
2708 cold_segments: Vec::new(),
2709 }
2710 }
2711
2712 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2713 if self.by_name.contains_key(&schema.name) {
2714 return Err(StorageError::DuplicateTable {
2715 name: schema.name.clone(),
2716 });
2717 }
2718 let idx = self.tables.len();
2719 let name = schema.name.clone();
2720 self.tables.push(Table::new(schema));
2721 self.by_name.insert(name, idx);
2722 Ok(())
2723 }
2724
2725 pub fn get(&self, name: &str) -> Option<&Table> {
2726 let idx = *self.by_name.get(name)?;
2727 self.tables.get(idx)
2728 }
2729
2730 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2731 let idx = *self.by_name.get(name)?;
2732 self.tables.get_mut(idx)
2733 }
2734
2735 pub fn table_count(&self) -> usize {
2736 self.tables.len()
2737 }
2738
2739 pub fn table_names(&self) -> Vec<String> {
2742 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2743 }
2744
2745 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2756 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2757 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2758 })?;
2759 let seg = OwnedSegment::from_bytes(bytes)
2760 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2761 self.cold_segments.push(Some(Arc::new(seg)));
2762 Ok(id)
2763 }
2764
2765 pub fn load_segment_bytes_at(
2778 &mut self,
2779 target_id: u32,
2780 bytes: Vec<u8>,
2781 ) -> Result<(), StorageError> {
2782 let seg = OwnedSegment::from_bytes(bytes)
2783 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2784 let idx = target_id as usize;
2785 while self.cold_segments.len() <= idx {
2786 self.cold_segments.push(None);
2787 }
2788 if self.cold_segments[idx].is_some() {
2789 return Err(StorageError::Corrupt(format!(
2790 "load_segment_bytes_at: segment_id {target_id} already occupied"
2791 )));
2792 }
2793 self.cold_segments[idx] = Some(Arc::new(seg));
2794 Ok(())
2795 }
2796
2797 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2807 let idx = segment_id as usize;
2808 if idx >= self.cold_segments.len() {
2809 return Err(StorageError::Corrupt(format!(
2810 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2811 self.cold_segments.len()
2812 )));
2813 }
2814 self.cold_segments[idx] = None;
2815 Ok(())
2816 }
2817
2818 #[must_use]
2820 pub fn cold_segment_count(&self) -> usize {
2821 self.cold_segments.iter().filter(|s| s.is_some()).count()
2822 }
2823
2824 #[must_use]
2827 pub fn cold_segment_slot_count(&self) -> usize {
2828 self.cold_segments.len()
2829 }
2830
2831 #[must_use]
2836 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2837 self.cold_segments
2838 .iter()
2839 .enumerate()
2840 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2841 .collect()
2842 }
2843
2844 #[must_use]
2851 pub fn hot_tier_bytes(&self) -> u64 {
2852 self.tables
2853 .iter()
2854 .map(Table::hot_bytes)
2855 .fold(0u64, u64::saturating_add)
2856 }
2857
2858 pub fn freeze_oldest_to_cold(
2903 &mut self,
2904 table_name: &str,
2905 index_name: &str,
2906 max_rows: usize,
2907 ) -> Result<FreezeReport, StorageError> {
2908 if max_rows == 0 {
2910 return Err(StorageError::Corrupt(
2911 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2912 ));
2913 }
2914 let table = self.get(table_name).ok_or_else(|| {
2915 StorageError::Corrupt(format!(
2916 "freeze_oldest_to_cold: table {table_name:?} not found"
2917 ))
2918 })?;
2919 if max_rows > table.rows.len() {
2920 return Err(StorageError::Corrupt(format!(
2921 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2922 table.rows.len()
2923 )));
2924 }
2925 let idx = table
2926 .indices
2927 .iter()
2928 .find(|i| i.name == index_name)
2929 .ok_or_else(|| {
2930 StorageError::Corrupt(format!(
2931 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2932 ))
2933 })?;
2934 if !matches!(idx.kind, IndexKind::BTree(_)) {
2935 return Err(StorageError::Corrupt(format!(
2936 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2937 )));
2938 }
2939 let column_position = idx.column_position;
2940
2941 let schema = table.schema.clone();
2943 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2944 for row_idx in 0..max_rows {
2945 let row = table.rows.get(row_idx).expect("bounds-checked above");
2946 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2947 StorageError::Corrupt(format!(
2948 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2949 ))
2950 })?;
2951 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2952 StorageError::Corrupt(format!(
2953 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2954 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2955 ))
2956 })?;
2957 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2958 }
2959 to_freeze.sort_by_key(|(k, _, _)| *k);
2964 for w in to_freeze.windows(2) {
2968 if w[0].0 == w[1].0 {
2969 return Err(StorageError::Corrupt(format!(
2970 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2971 w[0].0
2972 )));
2973 }
2974 }
2975 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2979 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2983 .into_iter()
2984 .map(|(k, body, _)| (k, body))
2985 .collect();
2986 let frozen_rows = seg_rows.len();
2987 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2988 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2989
2990 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2999 let positions: Vec<usize> = (0..max_rows).collect();
3000 let t_mut = self
3001 .get_mut(table_name)
3002 .expect("just validated; still present");
3003 let removed = t_mut.delete_rows(&positions);
3004 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3005 let bytes_after = t_mut.hot_bytes();
3006 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3007
3008 let segment_id = self
3009 .load_segment_bytes(seg_bytes.clone())
3010 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
3011 let new_cold = post_swap_keys.into_iter().map(|k| {
3012 (
3013 k,
3014 RowLocator::Cold {
3015 segment_id,
3016 page_offset: 0,
3017 },
3018 )
3019 });
3020 let t_mut = self.get_mut(table_name).expect("still present");
3021 t_mut.register_cold_locators(index_name, new_cold)?;
3022
3023 Ok(FreezeReport {
3024 segment_id,
3025 frozen_rows,
3026 bytes_freed,
3027 segment_bytes: seg_bytes,
3028 })
3029 }
3030
3031 #[must_use]
3037 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3038 self.cold_segments
3039 .get(segment_id as usize)
3040 .and_then(|s| s.as_deref())
3041 }
3042
3043 pub fn resolve_cold_locator(
3052 &self,
3053 table_name: &str,
3054 segment_id: u32,
3055 key: &IndexKey,
3056 ) -> Option<Row> {
3057 let t = self.get(table_name)?;
3058 let u64_key = index_key_as_u64(key)?;
3059 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3060 let payload = seg.lookup(u64_key)?;
3061 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3062 Some(row)
3063 }
3064
3065 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3083 let t = self.get(table)?;
3084 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3085 let locators = idx.lookup_eq(key);
3086 let cold_u64_key = index_key_as_u64(key);
3087 for loc in locators {
3088 match *loc {
3089 RowLocator::Hot(i) => {
3090 if let Some(row) = t.rows.get(i) {
3091 return Some(row.clone());
3092 }
3093 }
3094 RowLocator::Cold {
3095 segment_id,
3096 page_offset: _,
3097 } => {
3098 let Some(u64_key) = cold_u64_key else {
3099 continue;
3102 };
3103 let Some(seg) = self
3104 .cold_segments
3105 .get(segment_id as usize)
3106 .and_then(|s| s.as_deref())
3107 else {
3108 continue;
3119 };
3120 let Some(payload) = seg.lookup(u64_key) else {
3121 continue;
3122 };
3123 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3124 return Some(row);
3125 }
3126 }
3127 }
3128 None
3129 }
3130
3131 pub fn promote_cold_row(
3153 &mut self,
3154 table_name: &str,
3155 index_name: &str,
3156 key: &IndexKey,
3157 ) -> Result<Option<usize>, StorageError> {
3158 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3159 let Some((segment_id, _page_offset)) = cold_loc else {
3160 return Ok(None);
3161 };
3162 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3163 StorageError::Corrupt(
3164 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3165 .into(),
3166 )
3167 })?;
3168 let schema = self
3172 .get(table_name)
3173 .ok_or_else(|| {
3174 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3175 })?
3176 .schema
3177 .clone();
3178 let seg = self
3179 .cold_segments
3180 .get(segment_id as usize)
3181 .and_then(|s| s.as_ref())
3182 .ok_or_else(|| {
3183 StorageError::Corrupt(format!(
3184 "promote_cold_row: segment {segment_id} not registered on catalog"
3185 ))
3186 })?;
3187 let payload = seg.lookup(u64_key).ok_or_else(|| {
3188 StorageError::Corrupt(format!(
3189 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3190 but the segment's bloom/page lookup didn't return a row"
3191 ))
3192 })?;
3193 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3194 let t = self
3199 .get_mut(table_name)
3200 .expect("table existed at lookup time");
3201 t.insert(row)?;
3202 let new_hot_idx =
3203 t.rows.len().checked_sub(1).ok_or_else(|| {
3204 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3205 })?;
3206 t.remove_cold_locators_for_key(index_name, key)?;
3210 Ok(Some(new_hot_idx))
3211 }
3212
3213 pub fn shadow_cold_row(
3231 &mut self,
3232 table_name: &str,
3233 index_name: &str,
3234 key: &IndexKey,
3235 ) -> Result<usize, StorageError> {
3236 let t = self.get_mut(table_name).ok_or_else(|| {
3237 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3238 })?;
3239 t.remove_cold_locators_for_key(index_name, key)
3240 }
3241
3242 pub fn prepare_freeze_slice(
3260 &self,
3261 table_name: &str,
3262 index_name: &str,
3263 row_range: core::ops::Range<usize>,
3264 ) -> Result<FreezeSlice, StorageError> {
3265 let table = self.get(table_name).ok_or_else(|| {
3266 StorageError::Corrupt(format!(
3267 "prepare_freeze_slice: table {table_name:?} not found"
3268 ))
3269 })?;
3270 let idx = table
3271 .indices
3272 .iter()
3273 .find(|i| i.name == index_name)
3274 .ok_or_else(|| {
3275 StorageError::Corrupt(format!(
3276 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3277 ))
3278 })?;
3279 if !matches!(idx.kind, IndexKind::BTree(_)) {
3280 return Err(StorageError::Corrupt(format!(
3281 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3282 )));
3283 }
3284 if row_range.end > table.rows.len() {
3285 return Err(StorageError::Corrupt(format!(
3286 "prepare_freeze_slice: row_range end {} > row_count {}",
3287 row_range.end,
3288 table.rows.len()
3289 )));
3290 }
3291 let column_position = idx.column_position;
3292 let schema = table.schema.clone();
3293 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3294 for row_idx in row_range.clone() {
3295 let row = table.rows.get(row_idx).expect("bounds-checked above");
3296 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3297 StorageError::Corrupt(format!(
3298 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3299 ))
3300 })?;
3301 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3302 StorageError::Corrupt(format!(
3303 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3304 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3305 ))
3306 })?;
3307 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3308 }
3309 rows.sort_by_key(|(k, _, _)| *k);
3310 Ok(FreezeSlice { row_range, rows })
3311 }
3312
3313 pub fn commit_freeze_slices(
3327 &mut self,
3328 table_name: &str,
3329 index_name: &str,
3330 slices: Vec<FreezeSlice>,
3331 ) -> Result<FreezeReport, StorageError> {
3332 let table = self.get(table_name).ok_or_else(|| {
3334 StorageError::Corrupt(format!(
3335 "commit_freeze_slices: table {table_name:?} not found"
3336 ))
3337 })?;
3338 let idx = table
3339 .indices
3340 .iter()
3341 .find(|i| i.name == index_name)
3342 .ok_or_else(|| {
3343 StorageError::Corrupt(format!(
3344 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3345 ))
3346 })?;
3347 if !matches!(idx.kind, IndexKind::BTree(_)) {
3348 return Err(StorageError::Corrupt(format!(
3349 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3350 )));
3351 }
3352 let mut ordered = slices;
3356 ordered.sort_by_key(|s| s.row_range.start);
3357 let mut expected_start = 0usize;
3361 for s in &ordered {
3362 if s.row_range.start != expected_start {
3363 return Err(StorageError::Corrupt(format!(
3364 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3365 s.row_range.start, expected_start
3366 )));
3367 }
3368 expected_start = s.row_range.end;
3369 }
3370 let max_rows = expected_start;
3371 if max_rows > table.rows.len() {
3372 return Err(StorageError::Corrupt(format!(
3373 "commit_freeze_slices: total row range {} exceeds row_count {}",
3374 max_rows,
3375 table.rows.len()
3376 )));
3377 }
3378 if max_rows == 0 {
3379 return Ok(FreezeReport {
3380 segment_id: u32::MAX,
3381 frozen_rows: 0,
3382 bytes_freed: 0,
3383 segment_bytes: Vec::new(),
3384 });
3385 }
3386
3387 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3392 if total_rows != max_rows {
3393 return Err(StorageError::Corrupt(format!(
3394 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3395 )));
3396 }
3397 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3398 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3399 loop {
3400 let mut pick: Option<usize> = None;
3403 for (i, c) in cursors.iter().enumerate() {
3404 let slice = &ordered[i];
3405 if *c >= slice.rows.len() {
3406 continue;
3407 }
3408 match pick {
3409 None => pick = Some(i),
3410 Some(j) => {
3411 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3412 pick = Some(i);
3413 }
3414 }
3415 }
3416 }
3417 let Some(i) = pick else { break };
3418 let row = ordered[i].rows[cursors[i]].clone();
3419 cursors[i] += 1;
3420 merged.push(row);
3421 }
3422 for w in merged.windows(2) {
3425 if w[0].0 == w[1].0 {
3426 return Err(StorageError::Corrupt(format!(
3427 "commit_freeze_slices: duplicate PK {} across slices",
3428 w[0].0
3429 )));
3430 }
3431 }
3432 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3433 let seg_rows: Vec<(u64, Vec<u8>)> =
3434 merged.into_iter().map(|(k, body, _)| (k, body)).collect();
3435 let frozen_rows = seg_rows.len();
3436 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3437 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}")))?;
3438
3439 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3441 let positions: Vec<usize> = (0..max_rows).collect();
3442 let t_mut = self
3443 .get_mut(table_name)
3444 .expect("just validated; still present");
3445 let removed = t_mut.delete_rows(&positions);
3446 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3447 let bytes_after = t_mut.hot_bytes();
3448 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3449
3450 let segment_id = self
3451 .load_segment_bytes(seg_bytes.clone())
3452 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3453 let new_cold = post_swap_keys.into_iter().map(|k| {
3454 (
3455 k,
3456 RowLocator::Cold {
3457 segment_id,
3458 page_offset: 0,
3459 },
3460 )
3461 });
3462 let t_mut = self.get_mut(table_name).expect("still present");
3463 t_mut.register_cold_locators(index_name, new_cold)?;
3464
3465 Ok(FreezeReport {
3466 segment_id,
3467 frozen_rows,
3468 bytes_freed,
3469 segment_bytes: seg_bytes,
3470 })
3471 }
3472
3473 pub fn compact_cold_segments(
3516 &mut self,
3517 table_name: &str,
3518 index_name: &str,
3519 target_segment_bytes: u64,
3520 ) -> Result<CompactReport, StorageError> {
3521 let t = self.get(table_name).ok_or_else(|| {
3523 StorageError::Corrupt(format!(
3524 "compact_cold_segments: table {table_name:?} not found"
3525 ))
3526 })?;
3527 let idx = t
3528 .indices
3529 .iter()
3530 .find(|i| i.name == index_name)
3531 .ok_or_else(|| {
3532 StorageError::Corrupt(format!(
3533 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3534 ))
3535 })?;
3536 let map = match &idx.kind {
3537 IndexKind::BTree(m) => m,
3538 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3539 return Err(StorageError::Corrupt(format!(
3540 "compact_cold_segments: index {index_name:?} is not BTree; \
3541 compaction applies only to BTree cold-tier indices"
3542 )));
3543 }
3544 };
3545
3546 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3549 for (_key, locators) in map.iter() {
3550 for loc in locators {
3551 if let RowLocator::Cold { segment_id, .. } = loc {
3552 referenced_ids.insert(*segment_id);
3553 }
3554 }
3555 }
3556 let candidate_set: BTreeSet<u32> = referenced_ids
3558 .into_iter()
3559 .filter(|id| {
3560 self.cold_segments
3561 .get(*id as usize)
3562 .and_then(|s| s.as_deref())
3563 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3564 })
3565 .collect();
3566 if candidate_set.len() < 2 {
3567 return Ok(CompactReport {
3568 sources: Vec::new(),
3569 merged_segment_id: None,
3570 merged_segment_bytes: Vec::new(),
3571 merged_rows: 0,
3572 deleted_rows_pruned: 0,
3573 bytes_reclaimed_estimate: 0,
3574 });
3575 }
3576 let mut source_row_count: usize = 0;
3578 let mut source_byte_total: u64 = 0;
3579 for &id in &candidate_set {
3580 let seg = self.cold_segments[id as usize]
3581 .as_ref()
3582 .expect("candidate selected only when slot is Some");
3583 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3584 source_byte_total = source_byte_total.saturating_add(seg.bytes().len() as u64);
3585 }
3586 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3592 for (key, locators) in map.iter() {
3593 for loc in locators {
3594 let RowLocator::Cold { segment_id, .. } = loc else {
3595 continue;
3596 };
3597 if !candidate_set.contains(segment_id) {
3598 continue;
3599 }
3600 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3601 StorageError::Corrupt(format!(
3602 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3603 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3604 ))
3605 })?;
3606 let seg = self.cold_segments[*segment_id as usize]
3607 .as_ref()
3608 .expect("candidate slot guaranteed Some above");
3609 let payload = seg.lookup(u64_key).ok_or_else(|| {
3610 StorageError::Corrupt(format!(
3611 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3612 at segment {segment_id} but the segment lookup missed"
3613 ))
3614 })?;
3615 collected.insert(u64_key, (payload, key.clone()));
3616 break;
3617 }
3618 }
3619 let merged_rows = collected.len();
3620 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3621
3622 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3626 .iter()
3627 .map(|(k, (body, _))| (*k, body.clone()))
3628 .collect();
3629 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
3630 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: encode: {e}")))?;
3631 let merged_bytes_len = seg_bytes.len() as u64;
3632
3633 let merged_segment_id = self
3635 .load_segment_bytes(seg_bytes.clone())
3636 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3637
3638 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3644 let t = self
3645 .get(table_name)
3646 .expect("table existed at the start of this fn");
3647 let idx = t
3648 .indices
3649 .iter()
3650 .find(|i| i.name == index_name)
3651 .expect("index existed at the start of this fn");
3652 let IndexKind::BTree(map) = &idx.kind else {
3653 unreachable!("validated above");
3654 };
3655 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3656 };
3657 let t_mut = self
3658 .get_mut(table_name)
3659 .expect("table existed at the start of this fn");
3660 let idx_mut = t_mut
3661 .indices
3662 .iter_mut()
3663 .find(|i| i.name == index_name)
3664 .expect("index existed at the start of this fn");
3665 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3666 unreachable!("validated above");
3667 };
3668 for (key, locators) in entries {
3669 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3670 let mut changed = false;
3671 for loc in &locators {
3672 match *loc {
3673 RowLocator::Cold {
3674 segment_id,
3675 page_offset: _,
3676 } if candidate_set.contains(&segment_id) => {
3677 let replacement = RowLocator::Cold {
3678 segment_id: merged_segment_id,
3679 page_offset: 0,
3680 };
3681 if !new_locs.contains(&replacement) {
3682 new_locs.push(replacement);
3683 }
3684 changed = true;
3685 }
3686 other => new_locs.push(other),
3687 }
3688 }
3689 if changed {
3690 map_mut.insert_mut(key, new_locs);
3691 }
3692 }
3693
3694 for &id in &candidate_set {
3699 self.tombstone_segment(id)?;
3700 }
3701
3702 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3703 Ok(CompactReport {
3704 sources: candidate_set.into_iter().collect(),
3705 merged_segment_id: Some(merged_segment_id),
3706 merged_segment_bytes: seg_bytes,
3707 merged_rows,
3708 deleted_rows_pruned,
3709 bytes_reclaimed_estimate,
3710 })
3711 }
3712
3713 fn find_cold_locator(
3719 &self,
3720 table_name: &str,
3721 index_name: &str,
3722 key: &IndexKey,
3723 ) -> Result<Option<(u32, u32)>, StorageError> {
3724 let t = self.get(table_name).ok_or_else(|| {
3725 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3726 })?;
3727 let idx = t
3728 .indices
3729 .iter()
3730 .find(|i| i.name == index_name)
3731 .ok_or_else(|| {
3732 StorageError::Corrupt(format!(
3733 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3734 ))
3735 })?;
3736 if !matches!(idx.kind, IndexKind::BTree(_)) {
3737 return Err(StorageError::Corrupt(format!(
3738 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3739 )));
3740 }
3741 for loc in idx.lookup_eq(key) {
3742 if let RowLocator::Cold {
3743 segment_id,
3744 page_offset,
3745 } = *loc
3746 {
3747 return Ok(Some((segment_id, page_offset)));
3748 }
3749 }
3750 Ok(None)
3751 }
3752}
3753
3754fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3760 match key {
3761 IndexKey::Int(n) => Some(n.cast_unsigned()),
3767 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3768 }
3769}
3770
3771#[derive(Debug, Clone, PartialEq, Eq)]
3772#[non_exhaustive]
3773pub enum StorageError {
3774 DuplicateTable {
3775 name: String,
3776 },
3777 TableNotFound {
3778 name: String,
3779 },
3780 ArityMismatch {
3781 expected: usize,
3782 actual: usize,
3783 },
3784 TypeMismatch {
3785 column: String,
3786 expected: DataType,
3787 actual: DataType,
3788 position: usize,
3789 },
3790 NullInNotNull {
3791 column: String,
3792 },
3793 DuplicateIndex {
3795 name: String,
3796 },
3797 ColumnNotFound {
3799 column: String,
3800 },
3801 Corrupt(String),
3804 IndexNotFound {
3807 name: String,
3808 },
3809 Unsupported(String),
3813}
3814
3815impl fmt::Display for StorageError {
3816 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3817 match self {
3818 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3819 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3820 Self::ArityMismatch { expected, actual } => write!(
3821 f,
3822 "row arity mismatch: expected {expected} columns, got {actual}"
3823 ),
3824 Self::TypeMismatch {
3825 column,
3826 expected,
3827 actual,
3828 position,
3829 } => write!(
3830 f,
3831 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3832 ),
3833 Self::NullInNotNull { column } => {
3834 write!(f, "NULL value in NOT NULL column {column:?}")
3835 }
3836 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3837 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3838 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3839 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3840 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3841 }
3842 }
3843}
3844
3845impl ColumnSchema {
3846 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3847 Self {
3848 name: name.into(),
3849 ty,
3850 nullable,
3851 default: None,
3852 runtime_default: None,
3853 auto_increment: false,
3854 }
3855 }
3856
3857 #[must_use]
3861 pub fn with_default(mut self, default: Value) -> Self {
3862 self.default = Some(default);
3863 self
3864 }
3865
3866 #[must_use]
3871 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3872 self.runtime_default = Some(expr.into());
3873 self
3874 }
3875
3876 #[must_use]
3878 pub const fn with_auto_increment(mut self) -> Self {
3879 self.auto_increment = true;
3880 self
3881 }
3882}
3883
3884impl TableSchema {
3885 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3886 Self {
3887 name: name.into(),
3888 columns,
3889 hot_tier_bytes: None,
3890 foreign_keys: Vec::new(),
3891 uniqueness_constraints: Vec::new(),
3892 }
3893 }
3894}
3895
3896const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3944const FILE_VERSION: u8 = 19;
3970const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3973
3974const INDEX_KEY_TAG_INT: u8 = 0;
3979const INDEX_KEY_TAG_TEXT: u8 = 1;
3980const INDEX_KEY_TAG_BOOL: u8 = 2;
3981
3982impl Catalog {
3983 pub fn serialize(&self) -> Vec<u8> {
3986 let mut out = Vec::with_capacity(64);
3987 out.extend_from_slice(FILE_MAGIC);
3988 out.push(FILE_VERSION);
3989 write_u32(
3990 &mut out,
3991 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3992 );
3993 for t in &self.tables {
3994 write_str(&mut out, &t.schema.name);
3995 write_u16(
3996 &mut out,
3997 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3998 );
3999 for c in &t.schema.columns {
4000 write_str(&mut out, &c.name);
4001 write_data_type(&mut out, c.ty);
4002 out.push(u8::from(c.nullable));
4003 match &c.default {
4004 None => out.push(0),
4005 Some(v) => {
4006 out.push(1);
4007 write_value(&mut out, v);
4008 }
4009 }
4010 out.push(u8::from(c.auto_increment));
4011 }
4012 write_u32(
4013 &mut out,
4014 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4015 );
4016 for row in &t.rows {
4021 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4022 }
4023 write_u16(
4030 &mut out,
4031 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4032 );
4033 for idx in &t.indices {
4034 write_str(&mut out, &idx.name);
4035 write_u16(
4036 &mut out,
4037 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4038 );
4039 match &idx.kind {
4040 IndexKind::BTree(map) => {
4041 out.push(0);
4042 write_u32(
4050 &mut out,
4051 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4052 );
4053 for (key, locators) in map {
4054 write_index_key(&mut out, key);
4055 write_u32(
4056 &mut out,
4057 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4058 );
4059 for loc in locators {
4060 loc.write_le(&mut out);
4061 }
4062 }
4063 }
4064 IndexKind::Nsw(g) => {
4065 out.push(1);
4066 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4067 write_nsw_graph(&mut out, g);
4068 }
4069 IndexKind::Brin { column_type } => {
4070 out.push(2);
4076 write_data_type(&mut out, *column_type);
4077 }
4078 }
4079 write_u16(
4085 &mut out,
4086 u16::try_from(idx.included_columns.len()).expect("≤ 65k INCLUDE columns/index"),
4087 );
4088 for col_pos in &idx.included_columns {
4089 write_u16(
4090 &mut out,
4091 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4092 );
4093 }
4094 match &idx.partial_predicate {
4098 None => out.push(0),
4099 Some(pred) => {
4100 out.push(1);
4101 write_str(&mut out, pred);
4102 }
4103 }
4104 match &idx.expression {
4107 None => out.push(0),
4108 Some(expr) => {
4109 out.push(1);
4110 write_str(&mut out, expr);
4111 }
4112 }
4113 out.push(u8::from(idx.is_unique));
4117 write_u16(
4120 &mut out,
4121 u16::try_from(idx.extra_column_positions.len())
4122 .expect("≤ 65k extra cols / index"),
4123 );
4124 for cp in &idx.extra_column_positions {
4125 write_u16(&mut out, u16::try_from(*cp).expect("≤ 65k columns/table"));
4126 }
4127 }
4128 match t.schema.hot_tier_bytes {
4134 None => out.push(0),
4135 Some(n) => {
4136 out.push(1);
4137 out.extend_from_slice(&n.to_le_bytes());
4138 }
4139 }
4140 write_u16(
4151 &mut out,
4152 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4153 );
4154 for fk in &t.schema.foreign_keys {
4155 match &fk.name {
4156 None => out.push(0),
4157 Some(n) => {
4158 out.push(1);
4159 write_str(&mut out, n);
4160 }
4161 }
4162 write_u16(
4163 &mut out,
4164 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4165 );
4166 for &p in &fk.local_columns {
4167 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4168 }
4169 write_str(&mut out, &fk.parent_table);
4170 write_u16(
4171 &mut out,
4172 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4173 );
4174 for &p in &fk.parent_columns {
4175 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4176 }
4177 out.push(fk.on_delete.tag());
4178 out.push(fk.on_update.tag());
4179 }
4180 write_u16(
4189 &mut out,
4190 u16::try_from(t.schema.uniqueness_constraints.len())
4191 .expect("≤ 65k uniqueness constraints/table"),
4192 );
4193 for uc in &t.schema.uniqueness_constraints {
4194 out.push(u8::from(uc.is_primary_key));
4195 write_u16(
4196 &mut out,
4197 u16::try_from(uc.columns.len()).expect("≤ 65k cols in uniqueness constraint"),
4198 );
4199 for &p in &uc.columns {
4200 write_u16(&mut out, u16::try_from(p).expect("≤ 65k columns/table"));
4201 }
4202 }
4203 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4210 for (i, c) in t.schema.columns.iter().enumerate() {
4211 if let Some(e) = &c.runtime_default {
4212 rt_defaults.push((i, e.as_str()));
4213 }
4214 }
4215 write_u16(
4216 &mut out,
4217 u16::try_from(rt_defaults.len()).expect("≤ 65k runtime defaults/table"),
4218 );
4219 for (pos, expr) in rt_defaults {
4220 write_u16(&mut out, u16::try_from(pos).expect("≤ 65k columns/table"));
4221 write_str(&mut out, expr);
4222 }
4223 }
4224 out
4225 }
4226
4227 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4230 let mut cur = Cursor::new(buf);
4231 let magic = cur.take(8)?;
4232 if magic != FILE_MAGIC {
4233 return Err(StorageError::Corrupt(format!(
4234 "bad magic: expected SPGDB001, got {magic:?}"
4235 )));
4236 }
4237 let version = cur.read_u8()?;
4238 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4239 return Err(StorageError::Corrupt(format!(
4240 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4241 )));
4242 }
4243 let table_count = cur.read_u32()? as usize;
4244 let mut cat = Self::new();
4245 for _ in 0..table_count {
4246 deserialize_table(&mut cur, &mut cat, version)?;
4247 }
4248 if cur.pos < buf.len() {
4249 return Err(StorageError::Corrupt(format!(
4250 "trailing bytes: {} unread",
4251 buf.len() - cur.pos
4252 )));
4253 }
4254 Ok(cat)
4255 }
4256}
4257
4258fn deserialize_table(
4263 cur: &mut Cursor<'_>,
4264 cat: &mut Catalog,
4265 version: u8,
4266) -> Result<(), StorageError> {
4267 let table_name = cur.read_str()?;
4268 let name = table_name.clone();
4269 let col_count = cur.read_u16()? as usize;
4270 let mut cols = Vec::with_capacity(col_count);
4271 for _ in 0..col_count {
4272 let c_name = cur.read_str()?;
4273 let ty = cur.read_data_type()?;
4274 let nullable = cur.read_u8()? != 0;
4275 let default = match cur.read_u8()? {
4276 0 => None,
4277 1 => Some(cur.read_value()?),
4278 other => {
4279 return Err(StorageError::Corrupt(format!(
4280 "unknown default tag: {other}"
4281 )));
4282 }
4283 };
4284 let auto_increment = cur.read_u8()? != 0;
4285 cols.push(ColumnSchema {
4289 name: c_name,
4290 ty,
4291 nullable,
4292 default,
4293 runtime_default: None,
4294 auto_increment,
4295 });
4296 }
4297 let n_cols = cols.len();
4298 cat.create_table(TableSchema::new(name, cols))?;
4299 let t = cat.tables.last_mut().expect("create_table just pushed");
4303 deserialize_rows(cur, t, n_cols)?;
4304 deserialize_indices(cur, t, version)?;
4305 if version >= 11 {
4311 let has = cur.read_u8()?;
4312 let hot_tier_bytes = match has {
4313 0 => None,
4314 1 => Some(cur.read_u64()?),
4315 other => {
4316 return Err(StorageError::Corrupt(format!(
4317 "hot_tier_bytes appendix: unknown has-value byte {other}"
4318 )));
4319 }
4320 };
4321 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4322 }
4323 if version >= 13 {
4326 let fk_count = cur.read_u16()? as usize;
4327 let mut fks = Vec::with_capacity(fk_count);
4328 for _ in 0..fk_count {
4329 let name = match cur.read_u8()? {
4330 0 => None,
4331 1 => Some(cur.read_str()?),
4332 other => {
4333 return Err(StorageError::Corrupt(format!(
4334 "FK appendix: unknown has-name byte {other}"
4335 )));
4336 }
4337 };
4338 let local_arity = cur.read_u16()? as usize;
4339 let mut local_columns = Vec::with_capacity(local_arity);
4340 for _ in 0..local_arity {
4341 local_columns.push(cur.read_u16()? as usize);
4342 }
4343 let parent_table = cur.read_str()?;
4344 let parent_arity = cur.read_u16()? as usize;
4345 if parent_arity != local_arity {
4346 return Err(StorageError::Corrupt(format!(
4347 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4348 )));
4349 }
4350 let mut parent_columns = Vec::with_capacity(parent_arity);
4351 for _ in 0..parent_arity {
4352 parent_columns.push(cur.read_u16()? as usize);
4353 }
4354 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4355 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4356 })?;
4357 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4358 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4359 })?;
4360 fks.push(ForeignKeyConstraint {
4361 name,
4362 local_columns,
4363 parent_table,
4364 parent_columns,
4365 on_delete,
4366 on_update,
4367 });
4368 }
4369 t.schema_mut().foreign_keys = fks;
4370 }
4371 if version >= 15 {
4374 let uc_count = cur.read_u16()? as usize;
4375 let mut ucs = Vec::with_capacity(uc_count);
4376 for _ in 0..uc_count {
4377 let is_pk = cur.read_u8()? != 0;
4378 let arity = cur.read_u16()? as usize;
4379 let mut cols = Vec::with_capacity(arity);
4380 for _ in 0..arity {
4381 cols.push(cur.read_u16()? as usize);
4382 }
4383 ucs.push(UniquenessConstraint {
4384 is_primary_key: is_pk,
4385 columns: cols,
4386 });
4387 }
4388 t.schema_mut().uniqueness_constraints = ucs;
4389 let rt_count = cur.read_u16()? as usize;
4391 for _ in 0..rt_count {
4392 let pos = cur.read_u16()? as usize;
4393 let expr = cur.read_str()?;
4394 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4395 col.runtime_default = Some(expr);
4396 }
4397 }
4398 }
4399 let _ = table_name;
4400 Ok(())
4401}
4402
4403fn deserialize_rows(
4404 cur: &mut Cursor<'_>,
4405 t: &mut Table,
4406 _n_cols: usize,
4407) -> Result<(), StorageError> {
4408 let row_count = cur.read_u32()? as usize;
4409 let mut hot_bytes: u64 = 0;
4414 for _ in 0..row_count {
4415 let tail = &cur.buf[cur.pos..];
4416 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4417 cur.pos += consumed;
4418 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4424 t.rows.push_mut(row);
4425 }
4426 t.hot_bytes = hot_bytes;
4427 Ok(())
4428}
4429
4430fn deserialize_indices(
4431 cur: &mut Cursor<'_>,
4432 t: &mut Table,
4433 version: u8,
4434) -> Result<(), StorageError> {
4435 let index_count = cur.read_u16()? as usize;
4436 for _ in 0..index_count {
4437 let idx_name = cur.read_str()?;
4438 let col_pos = cur.read_u16()? as usize;
4439 let column_name = t
4440 .schema
4441 .columns
4442 .get(col_pos)
4443 .ok_or_else(|| {
4444 StorageError::Corrupt(format!(
4445 "index {idx_name:?} points at non-existent column position {col_pos}"
4446 ))
4447 })?
4448 .name
4449 .clone();
4450 let kind_tag = cur.read_u8()?;
4451 match kind_tag {
4452 0 => {
4453 if version >= 9 {
4454 let map = read_btree_map(cur)?;
4459 t.restore_btree_index(idx_name, &column_name, map)?;
4460 } else {
4461 t.add_index(idx_name, &column_name)?;
4466 }
4467 }
4468 1 => {
4469 let m = cur.read_u16()? as usize;
4470 let graph = cur.read_nsw_graph(m)?;
4471 t.restore_nsw_index(idx_name, &column_name, graph)?;
4472 }
4473 2 => {
4474 let column_type = cur.read_data_type()?;
4478 t.restore_brin_index(idx_name, &column_name, column_type)?;
4479 }
4480 other => {
4481 return Err(StorageError::Corrupt(format!(
4482 "unknown index kind tag: {other}"
4483 )));
4484 }
4485 }
4486 if version >= 12 {
4489 let num_included = cur.read_u16()? as usize;
4490 if num_included > 0 {
4491 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4492 for _ in 0..num_included {
4493 let cp = cur.read_u16()? as usize;
4494 if cp >= t.schema.columns.len() {
4495 return Err(StorageError::Corrupt(format!(
4496 "INCLUDE column position {cp} out of range \
4497 ({} schema columns)",
4498 t.schema.columns.len()
4499 )));
4500 }
4501 included.push(cp);
4502 }
4503 if let Some(last) = t.indices.last_mut() {
4504 last.included_columns = included;
4505 }
4506 }
4507 match cur.read_u8()? {
4509 0 => {}
4510 1 => {
4511 let pred = cur.read_str()?;
4512 if let Some(last) = t.indices.last_mut() {
4513 last.partial_predicate = Some(pred);
4514 }
4515 }
4516 other => {
4517 return Err(StorageError::Corrupt(format!(
4518 "partial_predicate tag: unknown byte {other}"
4519 )));
4520 }
4521 }
4522 match cur.read_u8()? {
4524 0 => {}
4525 1 => {
4526 let expr = cur.read_str()?;
4527 if let Some(last) = t.indices.last_mut() {
4528 last.expression = Some(expr);
4529 }
4530 }
4531 other => {
4532 return Err(StorageError::Corrupt(format!(
4533 "expression tag: unknown byte {other}"
4534 )));
4535 }
4536 }
4537 if version >= 16 {
4540 match cur.read_u8()? {
4541 0 => {}
4542 1 => {
4543 if let Some(last) = t.indices.last_mut() {
4544 last.is_unique = true;
4545 }
4546 }
4547 other => {
4548 return Err(StorageError::Corrupt(format!(
4549 "is_unique tag: unknown byte {other}"
4550 )));
4551 }
4552 }
4553 let n = cur.read_u16()? as usize;
4555 if n > 0 {
4556 let mut extras: Vec<usize> = Vec::with_capacity(n);
4557 for _ in 0..n {
4558 let cp = cur.read_u16()? as usize;
4559 if cp >= t.schema.columns.len() {
4560 return Err(StorageError::Corrupt(format!(
4561 "extra column position {cp} out of range \
4562 ({} schema columns)",
4563 t.schema.columns.len()
4564 )));
4565 }
4566 extras.push(cp);
4567 }
4568 if let Some(last) = t.indices.last_mut() {
4569 last.extra_column_positions = extras;
4570 }
4571 }
4572 }
4573 }
4574 }
4575 Ok(())
4576}
4577
4578fn read_btree_map(
4582 cur: &mut Cursor<'_>,
4583) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4584 let entry_count = cur.read_u32()? as usize;
4585 let mut map = PersistentBTreeMap::new();
4586 for _ in 0..entry_count {
4587 let key = cur.read_index_key()?;
4588 let locator_count = cur.read_u32()? as usize;
4589 let mut locators = Vec::with_capacity(locator_count);
4590 for _ in 0..locator_count {
4591 let tail = &cur.buf[cur.pos..];
4592 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4593 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4594 })?;
4595 cur.pos += consumed;
4596 locators.push(loc);
4597 }
4598 map.insert_mut(key, locators);
4599 }
4600 Ok(map)
4601}
4602
4603fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4619 let entry = g.entry.map_or(u32::MAX, |e| {
4620 u32::try_from(e).expect("NSW entry fits in u32")
4621 });
4622 write_u16(
4623 out,
4624 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4625 );
4626 out.extend_from_slice(&entry.to_le_bytes());
4627 out.push(g.entry_level);
4628 let node_count = g.levels.len();
4629 write_u32(
4630 out,
4631 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4632 );
4633 for &lvl in &g.levels {
4634 out.push(lvl);
4635 }
4636 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4637 out.push(layer_count);
4638 for layer in &g.layers {
4639 write_u32(
4640 out,
4641 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4642 );
4643 for neighbors in layer {
4644 write_u16(
4645 out,
4646 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4647 );
4648 for &peer in neighbors {
4652 write_u32(out, peer);
4653 }
4654 }
4655 }
4656}
4657
4658fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4659 match t {
4660 DataType::Int => out.push(1),
4661 DataType::BigInt => out.push(2),
4662 DataType::Float => out.push(3),
4663 DataType::Text => out.push(4),
4664 DataType::Bool => out.push(5),
4665 DataType::Vector { dim, encoding } => match encoding {
4666 VecEncoding::F32 => {
4670 out.push(6);
4671 out.extend_from_slice(&dim.to_le_bytes());
4672 }
4673 VecEncoding::F16 => {
4676 out.push(15);
4677 out.extend_from_slice(&dim.to_le_bytes());
4678 }
4679 VecEncoding::Sq8 => {
4685 out.push(14);
4686 out.extend_from_slice(&dim.to_le_bytes());
4687 }
4688 },
4689 DataType::SmallInt => out.push(7),
4690 DataType::Varchar(max) => {
4691 out.push(8);
4692 out.extend_from_slice(&max.to_le_bytes());
4693 }
4694 DataType::Char(size) => {
4695 out.push(9);
4696 out.extend_from_slice(&size.to_le_bytes());
4697 }
4698 DataType::Numeric { precision, scale } => {
4699 out.push(10);
4700 out.push(precision);
4701 out.push(scale);
4702 }
4703 DataType::Date => out.push(11),
4704 DataType::Timestamp => out.push(12),
4705 DataType::Timestamptz => out.push(17),
4709 DataType::Interval => {
4714 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4715 }
4716 DataType::Json => out.push(13),
4717 DataType::Jsonb => out.push(16),
4720 DataType::Bytes => out.push(18),
4722 DataType::TextArray => out.push(19),
4725 DataType::IntArray => out.push(20),
4728 DataType::BigIntArray => out.push(21),
4731 }
4732}
4733
4734impl Cursor<'_> {
4735 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4736 let tag = self.read_u8()?;
4737 match tag {
4738 1 => Ok(DataType::Int),
4739 2 => Ok(DataType::BigInt),
4740 3 => Ok(DataType::Float),
4741 4 => Ok(DataType::Text),
4742 5 => Ok(DataType::Bool),
4743 6 => Ok(DataType::Vector {
4744 dim: self.read_u32()?,
4745 encoding: VecEncoding::F32,
4746 }),
4747 7 => Ok(DataType::SmallInt),
4748 8 => Ok(DataType::Varchar(self.read_u32()?)),
4749 9 => Ok(DataType::Char(self.read_u32()?)),
4750 10 => {
4751 let precision = self.read_u8()?;
4752 let scale = self.read_u8()?;
4753 Ok(DataType::Numeric { precision, scale })
4754 }
4755 11 => Ok(DataType::Date),
4756 12 => Ok(DataType::Timestamp),
4757 13 => Ok(DataType::Json),
4758 14 => Ok(DataType::Vector {
4759 dim: self.read_u32()?,
4760 encoding: VecEncoding::Sq8,
4761 }),
4762 15 => Ok(DataType::Vector {
4766 dim: self.read_u32()?,
4767 encoding: VecEncoding::F16,
4768 }),
4769 16 => Ok(DataType::Jsonb),
4773 17 => Ok(DataType::Timestamptz),
4777 18 => Ok(DataType::Bytes),
4779 19 => Ok(DataType::TextArray),
4781 20 => Ok(DataType::IntArray),
4783 21 => Ok(DataType::BigIntArray),
4784 other => Err(StorageError::Corrupt(format!(
4785 "unknown data type tag: {other}"
4786 ))),
4787 }
4788 }
4789}
4790
4791pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4797 debug_assert_eq!(
4798 row.values.len(),
4799 schema.columns.len(),
4800 "row_body_encoded_len: row arity must match schema"
4801 );
4802 let bitmap_bytes = schema.columns.len().div_ceil(8);
4803 let mut n = bitmap_bytes;
4804 for (col_idx, v) in row.values.iter().enumerate() {
4805 if matches!(v, Value::Null) {
4806 continue;
4807 }
4808 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4809 }
4810 n
4811}
4812
4813fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4819 match v {
4820 Value::SmallInt(_) => 2,
4821 Value::Int(_) | Value::Date(_) => 4,
4823 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4825 Value::Bool(_) => 1,
4826 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4828 Value::Vector(vec) => 4 + 4 * vec.len(),
4830 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4837 Value::HalfVector(h) => 4 + h.bytes.len(),
4840 Value::Numeric { .. } => 16 + 1,
4842 Value::Bytes(b) => 2 + b.len(),
4848 Value::TextArray(items) => {
4851 let mut n = 2; for item in items {
4853 n += 1; if let Some(s) = item {
4855 n += 2 + s.len();
4856 }
4857 }
4858 n
4859 }
4860 Value::IntArray(items) => 2 + items.iter().map(|x| if x.is_some() { 5 } else { 1 }).sum::<usize>(),
4863 Value::BigIntArray(items) => 2 + items.iter().map(|x| if x.is_some() { 9 } else { 1 }).sum::<usize>(),
4864 Value::Null => 0,
4866 Value::Interval { .. } => {
4868 unreachable!("Value::Interval has no on-disk encoding")
4869 }
4870 }
4871}
4872
4873pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4884 debug_assert_eq!(
4885 row.values.len(),
4886 schema.columns.len(),
4887 "dense encode: row arity must match schema"
4888 );
4889 let bitmap_bytes = schema.columns.len().div_ceil(8);
4890 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4893 let bitmap_offset = out.len();
4894 out.resize(bitmap_offset + bitmap_bytes, 0);
4895 for (i, v) in row.values.iter().enumerate() {
4896 if matches!(v, Value::Null) {
4897 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4898 }
4899 }
4900 for (col_idx, v) in row.values.iter().enumerate() {
4901 if matches!(v, Value::Null) {
4902 continue;
4903 }
4904 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4905 }
4906 out
4907}
4908
4909pub fn decode_row_body_dense(
4915 bytes: &[u8],
4916 schema: &TableSchema,
4917) -> Result<(Row, usize), StorageError> {
4918 let mut cur = Cursor::new(bytes);
4919 let bitmap_bytes = schema.columns.len().div_ceil(8);
4920 let mut bitmap_buf = [0u8; 32];
4921 if bitmap_bytes > bitmap_buf.len() {
4922 return Err(StorageError::Corrupt(format!(
4923 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4924 )));
4925 }
4926 let slice = cur.take(bitmap_bytes)?;
4927 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4928 let mut values = Vec::with_capacity(schema.columns.len());
4929 for (col_idx, col) in schema.columns.iter().enumerate() {
4930 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4931 values.push(Value::Null);
4932 } else {
4933 values.push(cur.read_value_body(col.ty)?);
4934 }
4935 }
4936 Ok((Row { values }, cur.pos))
4937}
4938
4939fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4948 match (v, ty) {
4949 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4950 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4951 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4952 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4953 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4954 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4955 write_str(out, s);
4956 }
4957 (
4958 Value::Vector(v),
4959 DataType::Vector {
4960 encoding: VecEncoding::F32,
4961 ..
4962 },
4963 ) => {
4964 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4965 out.extend_from_slice(&dim.to_le_bytes());
4966 for x in v {
4967 out.extend_from_slice(&x.to_le_bytes());
4968 }
4969 }
4970 (
4976 Value::Sq8Vector(q),
4977 DataType::Vector {
4978 encoding: VecEncoding::Sq8,
4979 ..
4980 },
4981 ) => {
4982 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4983 out.extend_from_slice(&dim.to_le_bytes());
4984 out.extend_from_slice(&q.min.to_le_bytes());
4985 out.extend_from_slice(&q.max.to_le_bytes());
4986 out.extend_from_slice(&q.bytes);
4987 }
4988 (
4992 Value::HalfVector(h),
4993 DataType::Vector {
4994 encoding: VecEncoding::F16,
4995 ..
4996 },
4997 ) => {
4998 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4999 out.extend_from_slice(&dim.to_le_bytes());
5000 out.extend_from_slice(&h.bytes);
5001 }
5002 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
5003 out.extend_from_slice(&scaled.to_le_bytes());
5004 out.push(scale);
5005 }
5006 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
5007 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
5008 out.extend_from_slice(&t.to_le_bytes())
5009 }
5010 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5014 (Value::Bytes(b), DataType::Bytes) => {
5017 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5018 out.extend_from_slice(&len.to_le_bytes());
5019 out.extend_from_slice(b);
5020 }
5021 (Value::TextArray(items), DataType::TextArray) => {
5024 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5025 out.extend_from_slice(&count.to_le_bytes());
5026 for item in items {
5027 match item {
5028 None => out.push(1),
5029 Some(s) => {
5030 out.push(0);
5031 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5032 out.extend_from_slice(&len.to_le_bytes());
5033 out.extend_from_slice(s.as_bytes());
5034 }
5035 }
5036 }
5037 }
5038 (Value::IntArray(items), DataType::IntArray) => {
5041 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5042 out.extend_from_slice(&count.to_le_bytes());
5043 for item in items {
5044 match item {
5045 None => out.push(1),
5046 Some(n) => {
5047 out.push(0);
5048 out.extend_from_slice(&n.to_le_bytes());
5049 }
5050 }
5051 }
5052 }
5053 (Value::BigIntArray(items), DataType::BigIntArray) => {
5056 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5057 out.extend_from_slice(&count.to_le_bytes());
5058 for item in items {
5059 match item {
5060 None => out.push(1),
5061 Some(n) => {
5062 out.push(0);
5063 out.extend_from_slice(&n.to_le_bytes());
5064 }
5065 }
5066 }
5067 }
5068 (other, ty) => unreachable!(
5072 "schema-driven encode received mismatched value/type pair: \
5073 value tag={:?}, column type={:?}",
5074 other.data_type(),
5075 ty
5076 ),
5077 }
5078}
5079
5080fn write_value(out: &mut Vec<u8>, v: &Value) {
5081 match v {
5082 Value::Null => out.push(0),
5083 Value::SmallInt(n) => {
5084 out.push(7);
5085 out.extend_from_slice(&n.to_le_bytes());
5086 }
5087 Value::Int(n) => {
5088 out.push(1);
5089 out.extend_from_slice(&n.to_le_bytes());
5090 }
5091 Value::BigInt(n) => {
5092 out.push(2);
5093 out.extend_from_slice(&n.to_le_bytes());
5094 }
5095 Value::Float(x) => {
5096 out.push(3);
5097 out.extend_from_slice(&x.to_le_bytes());
5098 }
5099 Value::Text(s) | Value::Json(s) => {
5104 out.push(4);
5105 write_str(out, s);
5106 }
5107 Value::Bool(b) => {
5108 out.push(5);
5109 out.push(u8::from(*b));
5110 }
5111 Value::Vector(v) => {
5112 out.push(6);
5113 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5114 out.extend_from_slice(&dim.to_le_bytes());
5115 for x in v {
5116 out.extend_from_slice(&x.to_le_bytes());
5117 }
5118 }
5119 Value::Sq8Vector(q) => {
5124 out.push(11);
5125 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5126 out.extend_from_slice(&dim.to_le_bytes());
5127 out.extend_from_slice(&q.min.to_le_bytes());
5128 out.extend_from_slice(&q.max.to_le_bytes());
5129 out.extend_from_slice(&q.bytes);
5130 }
5131 Value::HalfVector(h) => {
5136 out.push(12);
5137 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5138 out.extend_from_slice(&dim.to_le_bytes());
5139 out.extend_from_slice(&h.bytes);
5140 }
5141 Value::Numeric { scaled, scale } => {
5142 out.push(8);
5143 out.extend_from_slice(&scaled.to_le_bytes());
5144 out.push(*scale);
5145 }
5146 Value::Date(d) => {
5147 out.push(9);
5148 out.extend_from_slice(&d.to_le_bytes());
5149 }
5150 Value::Timestamp(t) => {
5151 out.push(10);
5152 out.extend_from_slice(&t.to_le_bytes());
5153 }
5154 Value::Interval { .. } => {
5158 unreachable!(
5159 "Value::Interval has no on-disk encoding; engine must reject it before write"
5160 )
5161 }
5162 Value::Bytes(b) => {
5167 out.push(14);
5168 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5169 out.extend_from_slice(&len.to_le_bytes());
5170 out.extend_from_slice(b);
5171 }
5172 Value::TextArray(items) => {
5175 out.push(15);
5176 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5177 out.extend_from_slice(&count.to_le_bytes());
5178 for item in items {
5179 match item {
5180 None => out.push(1),
5181 Some(s) => {
5182 out.push(0);
5183 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5184 out.extend_from_slice(&len.to_le_bytes());
5185 out.extend_from_slice(s.as_bytes());
5186 }
5187 }
5188 }
5189 }
5190 Value::IntArray(items) => {
5193 out.push(16);
5194 let count = u16::try_from(items.len()).expect("INT[] ≤ 65k elements");
5195 out.extend_from_slice(&count.to_le_bytes());
5196 for item in items {
5197 match item {
5198 None => out.push(1),
5199 Some(n) => {
5200 out.push(0);
5201 out.extend_from_slice(&n.to_le_bytes());
5202 }
5203 }
5204 }
5205 }
5206 Value::BigIntArray(items) => {
5209 out.push(17);
5210 let count = u16::try_from(items.len()).expect("BIGINT[] ≤ 65k elements");
5211 out.extend_from_slice(&count.to_le_bytes());
5212 for item in items {
5213 match item {
5214 None => out.push(1),
5215 Some(n) => {
5216 out.push(0);
5217 out.extend_from_slice(&n.to_le_bytes());
5218 }
5219 }
5220 }
5221 }
5222 }
5223}
5224
5225fn write_u16(out: &mut Vec<u8>, n: u16) {
5226 out.extend_from_slice(&n.to_le_bytes());
5227}
5228fn write_u32(out: &mut Vec<u8>, n: u32) {
5229 out.extend_from_slice(&n.to_le_bytes());
5230}
5231fn write_str(out: &mut Vec<u8>, s: &str) {
5232 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
5233 write_u16(out, len);
5234 out.extend_from_slice(s.as_bytes());
5235}
5236
5237fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
5241 match key {
5242 IndexKey::Int(n) => {
5243 out.push(INDEX_KEY_TAG_INT);
5244 out.extend_from_slice(&n.to_le_bytes());
5245 }
5246 IndexKey::Text(s) => {
5247 out.push(INDEX_KEY_TAG_TEXT);
5248 write_str(out, s);
5249 }
5250 IndexKey::Bool(b) => {
5251 out.push(INDEX_KEY_TAG_BOOL);
5252 out.push(u8::from(*b));
5253 }
5254 }
5255}
5256
5257struct Cursor<'a> {
5258 buf: &'a [u8],
5259 pos: usize,
5260}
5261
5262impl<'a> Cursor<'a> {
5263 const fn new(buf: &'a [u8]) -> Self {
5264 Self { buf, pos: 0 }
5265 }
5266
5267 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5268 let end = self
5269 .pos
5270 .checked_add(n)
5271 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5272 if end > self.buf.len() {
5273 return Err(StorageError::Corrupt(format!(
5274 "unexpected EOF at offset {} (wanted {n} more bytes)",
5275 self.pos
5276 )));
5277 }
5278 let s = &self.buf[self.pos..end];
5279 self.pos = end;
5280 Ok(s)
5281 }
5282
5283 fn read_u8(&mut self) -> Result<u8, StorageError> {
5284 Ok(self.take(1)?[0])
5285 }
5286 fn read_u16(&mut self) -> Result<u16, StorageError> {
5287 let s = self.take(2)?;
5288 Ok(u16::from_le_bytes([s[0], s[1]]))
5289 }
5290 fn read_u32(&mut self) -> Result<u32, StorageError> {
5291 let s = self.take(4)?;
5292 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5293 }
5294 fn read_i32(&mut self) -> Result<i32, StorageError> {
5295 let s = self.take(4)?;
5296 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5297 }
5298 fn read_u64(&mut self) -> Result<u64, StorageError> {
5301 let s = self.take(8)?;
5302 Ok(u64::from_le_bytes([
5303 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
5304 ]))
5305 }
5306 fn read_i64(&mut self) -> Result<i64, StorageError> {
5307 let s = self.take(8)?;
5308 let arr: [u8; 8] = s.try_into().expect("checked");
5309 Ok(i64::from_le_bytes(arr))
5310 }
5311 fn read_f64(&mut self) -> Result<f64, StorageError> {
5312 let s = self.take(8)?;
5313 let arr: [u8; 8] = s.try_into().expect("checked");
5314 Ok(f64::from_le_bytes(arr))
5315 }
5316 fn read_f32(&mut self) -> Result<f32, StorageError> {
5317 let s = self.take(4)?;
5318 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5319 }
5320 fn read_str(&mut self) -> Result<String, StorageError> {
5321 let len = self.read_u16()? as usize;
5322 let bytes = self.take(len)?;
5323 core::str::from_utf8(bytes)
5324 .map(String::from)
5325 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
5326 }
5327
5328 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
5332 let tag = self.read_u8()?;
5333 match tag {
5334 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
5335 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
5336 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
5337 other => Err(StorageError::Corrupt(format!(
5338 "unknown index key tag: {other}"
5339 ))),
5340 }
5341 }
5342 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
5348 match ty {
5349 DataType::SmallInt => {
5350 let s = self.take(2)?;
5351 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5352 }
5353 DataType::Int => Ok(Value::Int(self.read_i32()?)),
5354 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
5355 DataType::Float => Ok(Value::Float(self.read_f64()?)),
5356 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
5357 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
5358 Ok(Value::Text(self.read_str()?))
5359 }
5360 DataType::Vector {
5361 encoding: VecEncoding::F32,
5362 ..
5363 } => {
5364 let dim = self.read_u32()? as usize;
5365 let mut v = Vec::with_capacity(dim);
5366 for _ in 0..dim {
5367 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5368 v.push(f32::from_le_bytes(bytes));
5369 }
5370 Ok(Value::Vector(v))
5371 }
5372 DataType::Vector {
5373 encoding: VecEncoding::Sq8,
5374 ..
5375 } => {
5376 let dim = self.read_u32()? as usize;
5377 let min = self.read_f32()?;
5378 let max = self.read_f32()?;
5379 let bytes = self.take(dim)?.to_vec();
5380 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5381 }
5382 DataType::Vector {
5383 encoding: VecEncoding::F16,
5384 ..
5385 } => {
5386 let dim = self.read_u32()? as usize;
5387 let bytes = self.take(dim * 2)?.to_vec();
5388 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5389 }
5390 DataType::Numeric { .. } => {
5391 let s = self.take(16)?;
5392 let arr: [u8; 16] = s.try_into().expect("checked");
5393 let scaled = i128::from_le_bytes(arr);
5394 let scale = self.read_u8()?;
5395 Ok(Value::Numeric { scaled, scale })
5396 }
5397 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5398 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5399 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5400 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5401 DataType::Interval => {
5402 Err(StorageError::Corrupt(
5407 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5408 ))
5409 }
5410 DataType::Json => Ok(Value::Json(self.read_str()?)),
5411 DataType::Bytes => {
5414 let len = self.read_u16()? as usize;
5415 let bytes = self.take(len)?.to_vec();
5416 Ok(Value::Bytes(bytes))
5417 }
5418 DataType::TextArray => {
5420 let count = self.read_u16()? as usize;
5421 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
5422 for _ in 0..count {
5423 match self.read_u8()? {
5424 0 => items.push(Some(self.read_str()?)),
5425 1 => items.push(None),
5426 other => {
5427 return Err(StorageError::Corrupt(format!(
5428 "TEXT[] null flag: unknown byte {other}"
5429 )));
5430 }
5431 }
5432 }
5433 Ok(Value::TextArray(items))
5434 }
5435 DataType::IntArray => {
5437 let count = self.read_u16()? as usize;
5438 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
5439 for _ in 0..count {
5440 match self.read_u8()? {
5441 0 => items.push(Some(self.read_i32()?)),
5442 1 => items.push(None),
5443 other => {
5444 return Err(StorageError::Corrupt(format!(
5445 "INT[] null flag: unknown byte {other}"
5446 )));
5447 }
5448 }
5449 }
5450 Ok(Value::IntArray(items))
5451 }
5452 DataType::BigIntArray => {
5454 let count = self.read_u16()? as usize;
5455 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
5456 for _ in 0..count {
5457 match self.read_u8()? {
5458 0 => items.push(Some(self.read_i64()?)),
5459 1 => items.push(None),
5460 other => {
5461 return Err(StorageError::Corrupt(format!(
5462 "BIGINT[] null flag: unknown byte {other}"
5463 )));
5464 }
5465 }
5466 }
5467 Ok(Value::BigIntArray(items))
5468 }
5469 }
5470 }
5471
5472 fn read_value(&mut self) -> Result<Value, StorageError> {
5473 let tag = self.read_u8()?;
5474 match tag {
5475 0 => Ok(Value::Null),
5476 1 => Ok(Value::Int(self.read_i32()?)),
5477 2 => Ok(Value::BigInt(self.read_i64()?)),
5478 3 => Ok(Value::Float(self.read_f64()?)),
5479 4 => Ok(Value::Text(self.read_str()?)),
5480 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5481 6 => {
5482 let dim = self.read_u32()? as usize;
5483 let mut v = Vec::with_capacity(dim);
5484 for _ in 0..dim {
5485 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5486 v.push(f32::from_le_bytes(bytes));
5487 }
5488 Ok(Value::Vector(v))
5489 }
5490 7 => {
5491 let s = self.take(2)?;
5492 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5493 }
5494 8 => {
5495 let s = self.take(16)?;
5496 let arr: [u8; 16] = s.try_into().expect("checked");
5497 let scaled = i128::from_le_bytes(arr);
5498 let scale = self.read_u8()?;
5499 Ok(Value::Numeric { scaled, scale })
5500 }
5501 9 => Ok(Value::Date(self.read_i32()?)),
5502 10 => Ok(Value::Timestamp(self.read_i64()?)),
5503 11 => {
5508 let dim = self.read_u32()? as usize;
5509 let min = self.read_f32()?;
5510 let max = self.read_f32()?;
5511 let bytes = self.take(dim)?.to_vec();
5512 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5513 }
5514 12 => {
5517 let dim = self.read_u32()? as usize;
5518 let bytes = self.take(dim * 2)?.to_vec();
5519 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5520 }
5521 14 => {
5523 let len = self.read_u16()? as usize;
5524 let bytes = self.take(len)?.to_vec();
5525 Ok(Value::Bytes(bytes))
5526 }
5527 15 => {
5530 let count = self.read_u16()? as usize;
5531 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
5532 for _ in 0..count {
5533 match self.read_u8()? {
5534 0 => items.push(Some(self.read_str()?)),
5535 1 => items.push(None),
5536 other => {
5537 return Err(StorageError::Corrupt(format!(
5538 "TEXT[] null flag in value tag: unknown byte {other}"
5539 )));
5540 }
5541 }
5542 }
5543 Ok(Value::TextArray(items))
5544 }
5545 16 => {
5547 let count = self.read_u16()? as usize;
5548 let mut items: Vec<Option<i32>> = Vec::with_capacity(count);
5549 for _ in 0..count {
5550 match self.read_u8()? {
5551 0 => items.push(Some(self.read_i32()?)),
5552 1 => items.push(None),
5553 other => {
5554 return Err(StorageError::Corrupt(format!(
5555 "INT[] null flag in value tag: unknown byte {other}"
5556 )));
5557 }
5558 }
5559 }
5560 Ok(Value::IntArray(items))
5561 }
5562 17 => {
5563 let count = self.read_u16()? as usize;
5564 let mut items: Vec<Option<i64>> = Vec::with_capacity(count);
5565 for _ in 0..count {
5566 match self.read_u8()? {
5567 0 => items.push(Some(self.read_i64()?)),
5568 1 => items.push(None),
5569 other => {
5570 return Err(StorageError::Corrupt(format!(
5571 "BIGINT[] null flag in value tag: unknown byte {other}"
5572 )));
5573 }
5574 }
5575 }
5576 Ok(Value::BigIntArray(items))
5577 }
5578 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5579 }
5580 }
5581
5582 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5586 let m_max_0 = self.read_u16()? as usize;
5587 let entry_raw = self.read_u32()?;
5588 let entry = if entry_raw == u32::MAX {
5589 None
5590 } else {
5591 Some(entry_raw as usize)
5592 };
5593 let entry_level = self.read_u8()?;
5594 let node_count = self.read_u32()? as usize;
5595 let mut levels: PersistentVec<u8> = PersistentVec::new();
5600 for _ in 0..node_count {
5601 levels.push_mut(self.read_u8()?);
5602 }
5603 let layer_count = self.read_u8()? as usize;
5604 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5605 for _ in 0..layer_count {
5606 let n = self.read_u32()? as usize;
5607 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5608 for _ in 0..n {
5609 let cnt = self.read_u16()? as usize;
5610 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5611 for _ in 0..cnt {
5612 row.push(self.read_u32()?);
5613 }
5614 per_layer.push_mut(row);
5615 }
5616 layers.push(per_layer);
5617 }
5618 Ok(NswGraph {
5619 m,
5620 m_max_0,
5621 entry,
5622 entry_level,
5623 levels,
5624 layers,
5625 })
5626 }
5627}
5628
5629#[cfg(test)]
5630mod tests {
5631 use super::*;
5632 use alloc::string::ToString;
5633 use alloc::vec;
5634
5635 #[cfg(target_arch = "aarch64")]
5636 #[test]
5637 fn neon_l2_matches_scalar() {
5638 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5643 for &d in &dims {
5644 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5645 let mut a = Vec::with_capacity(d);
5646 let mut b = Vec::with_capacity(d);
5647 for _ in 0..d {
5648 state = state
5649 .wrapping_mul(6_364_136_223_846_793_005)
5650 .wrapping_add(1);
5651 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5652 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5653 state = state
5654 .wrapping_mul(6_364_136_223_846_793_005)
5655 .wrapping_add(1);
5656 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5657 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5658 a.push(x);
5659 b.push(y);
5660 }
5661 let scalar = l2_distance_sq_scalar(&a, &b);
5662 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5663 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5664 assert!(
5665 (scalar - neon).abs() <= tol,
5666 "dim={d}: scalar={scalar} neon={neon} diff={}",
5667 (scalar - neon).abs()
5668 );
5669 }
5670 }
5671
5672 #[cfg(target_arch = "aarch64")]
5673 #[test]
5674 fn neon_inner_product_matches_scalar() {
5675 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5679 for &d in &dims {
5680 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5681 let mut a = Vec::with_capacity(d);
5682 let mut b = Vec::with_capacity(d);
5683 for _ in 0..d {
5684 state = state
5685 .wrapping_mul(6_364_136_223_846_793_005)
5686 .wrapping_add(1);
5687 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5688 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5689 state = state
5690 .wrapping_mul(6_364_136_223_846_793_005)
5691 .wrapping_add(1);
5692 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5693 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5694 a.push(x);
5695 b.push(y);
5696 }
5697 let scalar = inner_product_scalar(&a, &b);
5698 let neon = unsafe { inner_product_neon(&a, &b) };
5699 #[allow(clippy::cast_precision_loss)]
5700 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5701 assert!(
5702 (scalar - neon).abs() <= tol,
5703 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5704 (scalar - neon).abs()
5705 );
5706 }
5707 }
5708
5709 #[cfg(target_arch = "aarch64")]
5710 #[allow(clippy::similar_names)]
5711 #[test]
5712 fn neon_cosine_dot_norms_matches_scalar() {
5713 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5714 for &d in &dims {
5715 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5716 let mut a = Vec::with_capacity(d);
5717 let mut b = Vec::with_capacity(d);
5718 for _ in 0..d {
5719 state = state
5720 .wrapping_mul(6_364_136_223_846_793_005)
5721 .wrapping_add(1);
5722 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5723 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5724 state = state
5725 .wrapping_mul(6_364_136_223_846_793_005)
5726 .wrapping_add(1);
5727 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5728 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5729 a.push(x);
5730 b.push(y);
5731 }
5732 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5733 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5734 #[allow(clippy::cast_precision_loss)]
5735 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5736 #[allow(clippy::cast_precision_loss)]
5737 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5738 assert!(
5739 (dot_s - dot_n).abs() <= tol_d,
5740 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5741 );
5742 assert!(
5743 (na_s - na_n).abs() <= tol_n,
5744 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5745 );
5746 assert!(
5747 (nb_s - nb_n).abs() <= tol_n,
5748 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5749 );
5750 }
5751 }
5752
5753 fn make_users_schema() -> TableSchema {
5754 TableSchema::new(
5755 "users",
5756 vec![
5757 ColumnSchema::new("id", DataType::Int, false),
5758 ColumnSchema::new("name", DataType::Text, false),
5759 ColumnSchema::new("score", DataType::Float, true),
5760 ],
5761 )
5762 }
5763
5764 #[test]
5765 fn value_type_tag_matches_variant() {
5766 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5767 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5768 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5769 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5770 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5771 assert_eq!(Value::Null.data_type(), None);
5772 assert!(Value::Null.is_null());
5773 assert!(!Value::Int(0).is_null());
5774 }
5775
5776 #[test]
5777 fn sq8_value_reports_sq8_data_type() {
5778 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5783 let v = Value::Sq8Vector(q);
5784 assert_eq!(
5785 v.data_type(),
5786 Some(DataType::Vector {
5787 dim: 5,
5788 encoding: VecEncoding::Sq8,
5789 }),
5790 );
5791 }
5792
5793 #[test]
5794 fn datatype_display_matches_pg_keyword() {
5795 assert_eq!(DataType::Int.to_string(), "INT");
5796 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5797 assert_eq!(DataType::Float.to_string(), "FLOAT");
5798 assert_eq!(DataType::Text.to_string(), "TEXT");
5799 assert_eq!(DataType::Bool.to_string(), "BOOL");
5800 }
5801
5802 #[test]
5803 fn row_len_and_emptiness() {
5804 let r = Row::new(vec![Value::Int(1), Value::Null]);
5805 assert_eq!(r.len(), 2);
5806 assert!(!r.is_empty());
5807 assert!(Row::new(Vec::new()).is_empty());
5808 }
5809
5810 #[test]
5811 fn table_schema_column_position() {
5812 let s = make_users_schema();
5813 assert_eq!(s.column_position("id"), Some(0));
5814 assert_eq!(s.column_position("score"), Some(2));
5815 assert_eq!(s.column_position("missing"), None);
5816 }
5817
5818 #[test]
5819 fn catalog_create_table_then_lookup() {
5820 let mut cat = Catalog::new();
5821 cat.create_table(make_users_schema()).unwrap();
5822 assert_eq!(cat.table_count(), 1);
5823 assert!(cat.get("users").is_some());
5824 assert!(cat.get("nope").is_none());
5825 }
5826
5827 #[test]
5828 fn catalog_duplicate_table_is_rejected() {
5829 let mut cat = Catalog::new();
5830 cat.create_table(make_users_schema()).unwrap();
5831 let err = cat.create_table(make_users_schema()).unwrap_err();
5832 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5833 }
5834
5835 #[test]
5836 fn table_insert_happy_path_appends_row() {
5837 let mut cat = Catalog::new();
5838 cat.create_table(make_users_schema()).unwrap();
5839 let t = cat.get_mut("users").unwrap();
5840 t.insert(Row::new(vec![
5841 Value::Int(1),
5842 Value::Text("alice".into()),
5843 Value::Float(99.5),
5844 ]))
5845 .unwrap();
5846 assert_eq!(t.row_count(), 1);
5847 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5848 }
5849
5850 #[test]
5851 fn table_insert_arity_mismatch() {
5852 let mut cat = Catalog::new();
5853 cat.create_table(make_users_schema()).unwrap();
5854 let t = cat.get_mut("users").unwrap();
5855 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5856 assert!(matches!(
5857 err,
5858 StorageError::ArityMismatch {
5859 expected: 3,
5860 actual: 1
5861 }
5862 ));
5863 assert_eq!(t.row_count(), 0);
5864 }
5865
5866 #[test]
5867 fn table_insert_type_mismatch_reports_column() {
5868 let mut cat = Catalog::new();
5869 cat.create_table(make_users_schema()).unwrap();
5870 let t = cat.get_mut("users").unwrap();
5871 let err = t
5872 .insert(Row::new(vec![
5873 Value::Int(1),
5874 Value::Int(42), Value::Float(0.0),
5876 ]))
5877 .unwrap_err();
5878 match err {
5879 StorageError::TypeMismatch {
5880 ref column,
5881 expected,
5882 actual,
5883 position,
5884 } => {
5885 assert_eq!(column, "name");
5886 assert_eq!(expected, DataType::Text);
5887 assert_eq!(actual, DataType::Int);
5888 assert_eq!(position, 1);
5889 }
5890 other => panic!("unexpected: {other:?}"),
5891 }
5892 assert_eq!(t.row_count(), 0);
5893 }
5894
5895 #[test]
5896 fn table_insert_null_into_not_null_rejected() {
5897 let mut cat = Catalog::new();
5898 cat.create_table(make_users_schema()).unwrap();
5899 let t = cat.get_mut("users").unwrap();
5900 let err = t
5901 .insert(Row::new(vec![
5902 Value::Int(1),
5903 Value::Null, Value::Float(1.0),
5905 ]))
5906 .unwrap_err();
5907 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5908 }
5909
5910 #[test]
5911 fn table_insert_null_into_nullable_ok() {
5912 let mut cat = Catalog::new();
5913 cat.create_table(make_users_schema()).unwrap();
5914 let t = cat.get_mut("users").unwrap();
5915 t.insert(Row::new(vec![
5916 Value::Int(1),
5917 Value::Text("bob".into()),
5918 Value::Null,
5919 ]))
5920 .unwrap();
5921 assert_eq!(t.row_count(), 1);
5922 }
5923
5924 #[test]
5925 fn catalog_get_mut_independent_per_table() {
5926 let mut cat = Catalog::new();
5927 cat.create_table(TableSchema::new(
5928 "a",
5929 vec![ColumnSchema::new("v", DataType::Int, false)],
5930 ))
5931 .unwrap();
5932 cat.create_table(TableSchema::new(
5933 "b",
5934 vec![ColumnSchema::new("v", DataType::Int, false)],
5935 ))
5936 .unwrap();
5937 cat.get_mut("a")
5938 .unwrap()
5939 .insert(Row::new(vec![Value::Int(1)]))
5940 .unwrap();
5941 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5942 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5943 }
5944
5945 fn assert_round_trip(cat: &Catalog) {
5948 let bytes = cat.serialize();
5949 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5950 assert_eq!(restored.table_count(), cat.table_count());
5953 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5954 assert_eq!(a.schema, b.schema);
5955 assert_eq!(a.rows, b.rows);
5956 }
5957 }
5958
5959 #[test]
5960 fn serialize_empty_catalog_round_trips() {
5961 assert_round_trip(&Catalog::new());
5962 }
5963
5964 #[test]
5965 fn serialize_single_empty_table_round_trips() {
5966 let mut cat = Catalog::new();
5967 cat.create_table(make_users_schema()).unwrap();
5968 assert_round_trip(&cat);
5969 }
5970
5971 #[test]
5972 fn nsw_clone_is_o1() {
5973 let mut cat = Catalog::new();
5982 cat.create_table(TableSchema::new(
5983 "docs",
5984 alloc::vec![
5985 ColumnSchema::new("id", DataType::Int, false),
5986 ColumnSchema::new(
5987 "v",
5988 DataType::Vector {
5989 dim: 3,
5990 encoding: VecEncoding::F32
5991 },
5992 true
5993 ),
5994 ],
5995 ))
5996 .unwrap();
5997 let t = cat.get_mut("docs").unwrap();
5998 for i in 0..1500_i32 {
5999 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
6001 t.insert(Row::new(alloc::vec![
6002 Value::Int(i),
6003 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6004 ]))
6005 .unwrap();
6006 }
6007 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6008 .unwrap();
6009 let g = match &cat.get("docs").unwrap().indices()[0].kind {
6010 IndexKind::Nsw(g) => g,
6011 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6012 };
6013 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
6016 assert!(
6017 g.layers.len() >= 2,
6018 "1500 nodes should populate at least two HNSW layers, got {}",
6019 g.layers.len()
6020 );
6021
6022 let cloned = g.clone();
6023
6024 assert!(
6025 g.levels.shares_storage_with(&cloned.levels),
6026 "levels PV not shared after clone — clone copied elements (O(N))"
6027 );
6028 assert_eq!(g.layers.len(), cloned.layers.len());
6029 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
6030 assert!(
6031 orig.shares_storage_with(cl),
6032 "layer {l} PV not shared after clone — clone copied elements (O(N))"
6033 );
6034 }
6035 }
6036
6037 #[test]
6038 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
6039 let mut cat = Catalog::new();
6046 cat.create_table(TableSchema::new(
6047 "vecs",
6048 alloc::vec![
6049 ColumnSchema::new("id", DataType::Int, false),
6050 ColumnSchema::new(
6051 "v",
6052 DataType::Vector {
6053 dim: 8,
6054 encoding: VecEncoding::Sq8,
6055 },
6056 false,
6057 ),
6058 ],
6059 ))
6060 .unwrap();
6061 let t = cat.get_mut("vecs").unwrap();
6062 for i in 0..32_i32 {
6063 #[allow(clippy::cast_precision_loss)]
6064 let base = (i as f32) * 0.03;
6065 let v: Vec<f32> = (0..8_i32)
6066 .map(|j| {
6067 #[allow(clippy::cast_precision_loss)]
6068 let off = (j as f32) * 0.01;
6069 base + off
6070 })
6071 .collect();
6072 t.insert(Row::new(alloc::vec![
6073 Value::Int(i),
6074 Value::Sq8Vector(quantize::quantize(&v)),
6075 ]))
6076 .unwrap();
6077 }
6078 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6079 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6082 let (before_cell, before_ty, before_hits) = {
6083 let t_ref = cat.get("vecs").unwrap();
6084 (
6085 t_ref.rows()[5].values[1].clone(),
6086 t_ref.schema().columns[1].ty,
6087 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6088 )
6089 };
6090
6091 let bytes = cat.serialize();
6092 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6093 let rt = restored.get("vecs").unwrap();
6094 assert_eq!(rt.schema().columns[1].ty, before_ty);
6095 assert_eq!(rt.rows()[5].values[1], before_cell);
6096 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6097 assert_eq!(before_hits, after_hits);
6098 }
6099
6100 #[test]
6101 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
6102 use crate::halfvec;
6109 let mut cat = Catalog::new();
6110 cat.create_table(TableSchema::new(
6111 "vecs",
6112 alloc::vec![
6113 ColumnSchema::new("id", DataType::Int, false),
6114 ColumnSchema::new(
6115 "v",
6116 DataType::Vector {
6117 dim: 8,
6118 encoding: VecEncoding::F16,
6119 },
6120 false,
6121 ),
6122 ],
6123 ))
6124 .unwrap();
6125 let t = cat.get_mut("vecs").unwrap();
6126 for i in 0..32_i32 {
6127 #[allow(clippy::cast_precision_loss)]
6128 let base = (i as f32) * 0.03;
6129 let v: Vec<f32> = (0..8_i32)
6130 .map(|j| {
6131 #[allow(clippy::cast_precision_loss)]
6132 let off = (j as f32) * 0.01;
6133 base + off
6134 })
6135 .collect();
6136 t.insert(Row::new(alloc::vec![
6137 Value::Int(i),
6138 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
6139 ]))
6140 .unwrap();
6141 }
6142 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6143 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6144 let (before_cell, before_ty, before_hits) = {
6145 let t_ref = cat.get("vecs").unwrap();
6146 (
6147 t_ref.rows()[5].values[1].clone(),
6148 t_ref.schema().columns[1].ty,
6149 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6150 )
6151 };
6152 let bytes = cat.serialize();
6153 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6154 let rt = restored.get("vecs").unwrap();
6155 assert_eq!(rt.schema().columns[1].ty, before_ty);
6156 assert_eq!(rt.rows()[5].values[1], before_cell);
6157 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6158 assert_eq!(before_hits, after_hits);
6159 }
6160
6161 #[test]
6162 #[allow(clippy::similar_names)]
6163 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
6164 use crate::halfvec;
6171 fn next(state: &mut u64) -> f32 {
6172 *state = state
6173 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6174 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6175 #[allow(clippy::cast_precision_loss)]
6176 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6177 2.0 * u - 1.0
6178 }
6179 let dim: u32 = 32;
6180 let n: usize = 512;
6181 let dim_us = dim as usize;
6182 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
6183 let corpus: Vec<Vec<f32>> = (0..n)
6184 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6185 .collect();
6186 let queries: Vec<Vec<f32>> = (0..32)
6187 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6188 .collect();
6189 let exact_top10: Vec<Vec<usize>> = queries
6190 .iter()
6191 .map(|q| {
6192 let mut scored: Vec<(f32, usize)> = corpus
6193 .iter()
6194 .enumerate()
6195 .map(|(i, v)| (l2_distance_sq(v, q), i))
6196 .collect();
6197 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6198 scored.into_iter().take(10).map(|(_, i)| i).collect()
6199 })
6200 .collect();
6201 let mut cat = Catalog::new();
6202 cat.create_table(TableSchema::new(
6203 "vecs",
6204 alloc::vec![
6205 ColumnSchema::new("id", DataType::Int, false),
6206 ColumnSchema::new(
6207 "v",
6208 DataType::Vector {
6209 dim,
6210 encoding: VecEncoding::F16,
6211 },
6212 false,
6213 ),
6214 ],
6215 ))
6216 .unwrap();
6217 let t = cat.get_mut("vecs").unwrap();
6218 for (i, v) in corpus.iter().enumerate() {
6219 t.insert(Row::new(alloc::vec![
6220 Value::Int(i32::try_from(i).unwrap()),
6221 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
6222 ]))
6223 .unwrap();
6224 }
6225 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6226 let table = cat.get("vecs").unwrap();
6227 let mut total_overlap = 0_usize;
6228 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6229 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6230 for h in &hits {
6231 if exact.contains(h) {
6232 total_overlap += 1;
6233 }
6234 }
6235 }
6236 #[allow(clippy::cast_precision_loss)]
6237 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6238 assert!(
6239 recall >= 0.95,
6240 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6241 check halfvec dispatch in `cell_to_query_metric_distance`"
6242 );
6243 }
6244
6245 #[test]
6246 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
6247 use crate::quantize;
6254 fn next(state: &mut u64) -> f32 {
6258 *state = state
6259 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6260 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6261 #[allow(clippy::cast_precision_loss)]
6262 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6263 2.0 * u - 1.0
6264 }
6265 let dim: u32 = 32;
6266 let n: usize = 512;
6267 let dim_us = dim as usize;
6268 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
6269 let corpus: Vec<Vec<f32>> = (0..n)
6270 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6271 .collect();
6272 let queries: Vec<Vec<f32>> = (0..32)
6273 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6274 .collect();
6275 let exact_top10: Vec<Vec<usize>> = queries
6277 .iter()
6278 .map(|q| {
6279 let mut scored: Vec<(f32, usize)> = corpus
6280 .iter()
6281 .enumerate()
6282 .map(|(i, v)| (l2_distance_sq(v, q), i))
6283 .collect();
6284 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6285 scored.into_iter().take(10).map(|(_, i)| i).collect()
6286 })
6287 .collect();
6288 let mut cat = Catalog::new();
6291 cat.create_table(TableSchema::new(
6292 "vecs",
6293 alloc::vec![
6294 ColumnSchema::new("id", DataType::Int, false),
6295 ColumnSchema::new(
6296 "v",
6297 DataType::Vector {
6298 dim,
6299 encoding: VecEncoding::Sq8,
6300 },
6301 false,
6302 ),
6303 ],
6304 ))
6305 .unwrap();
6306 let t = cat.get_mut("vecs").unwrap();
6307 for (i, v) in corpus.iter().enumerate() {
6308 t.insert(Row::new(alloc::vec![
6309 Value::Int(i32::try_from(i).unwrap()),
6310 Value::Sq8Vector(quantize::quantize(v)),
6311 ]))
6312 .unwrap();
6313 }
6314 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6315 let table = cat.get("vecs").unwrap();
6316 let mut total_overlap = 0_usize;
6317 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6318 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6319 for h in &hits {
6320 if exact.contains(h) {
6321 total_overlap += 1;
6322 }
6323 }
6324 }
6325 #[allow(clippy::cast_precision_loss)]
6326 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6327 assert!(
6328 recall >= 0.95,
6329 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6330 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
6331 );
6332 }
6333
6334 #[test]
6335 fn nsw_index_topology_persists_through_round_trip() {
6336 let mut cat = Catalog::new();
6342 cat.create_table(TableSchema::new(
6343 "docs",
6344 alloc::vec![
6345 ColumnSchema::new("id", DataType::Int, false),
6346 ColumnSchema::new(
6347 "v",
6348 DataType::Vector {
6349 dim: 3,
6350 encoding: VecEncoding::F32
6351 },
6352 true
6353 ),
6354 ],
6355 ))
6356 .unwrap();
6357 let t = cat.get_mut("docs").unwrap();
6358 for i in 0..6_i32 {
6359 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
6361 let row = Row::new(alloc::vec![
6362 Value::Int(i),
6363 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6364 ]);
6365 t.insert(row).unwrap();
6366 }
6367 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6368 .unwrap();
6369 let original = match &cat.get("docs").unwrap().indices()[0].kind {
6370 IndexKind::Nsw(g) => g.clone(),
6371 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6372 };
6373 let bytes = cat.serialize();
6374 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6375 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
6376 IndexKind::Nsw(g) => g.clone(),
6377 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6378 };
6379 assert_eq!(restored_graph.m, original.m);
6380 assert_eq!(restored_graph.m_max_0, original.m_max_0);
6381 assert_eq!(restored_graph.entry, original.entry);
6382 assert_eq!(restored_graph.entry_level, original.entry_level);
6383 assert_eq!(restored_graph.levels, original.levels);
6384 assert_eq!(restored_graph.layers, original.layers);
6385 }
6386
6387 #[test]
6388 fn hnsw_level_assignment_is_deterministic() {
6389 for i in 0..32usize {
6392 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
6393 }
6394 }
6395
6396 #[test]
6397 fn hnsw_layer_0_dominates_population() {
6398 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
6403 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
6404 }
6405
6406 #[test]
6407 fn hnsw_search_matches_brute_force_for_l2_top1() {
6408 let mut cat = Catalog::new();
6412 cat.create_table(TableSchema::new(
6413 "vecs",
6414 alloc::vec![
6415 ColumnSchema::new("id", DataType::Int, false),
6416 ColumnSchema::new(
6417 "v",
6418 DataType::Vector {
6419 dim: 3,
6420 encoding: VecEncoding::F32
6421 },
6422 true
6423 ),
6424 ],
6425 ))
6426 .unwrap();
6427 let t = cat.get_mut("vecs").unwrap();
6428 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
6429 (1, [0.0, 0.0, 0.0]),
6430 (2, [1.0, 0.0, 0.0]),
6431 (3, [0.0, 1.0, 0.0]),
6432 (4, [0.0, 0.0, 1.0]),
6433 (5, [1.0, 1.0, 0.0]),
6434 (6, [1.0, 0.0, 1.0]),
6435 (7, [0.0, 1.0, 1.0]),
6436 (8, [1.0, 1.0, 1.0]),
6437 (9, [0.5, 0.5, 0.5]),
6438 (10, [0.2, 0.8, 0.5]),
6439 ];
6440 for &(id, v) in &dataset {
6441 t.insert(Row::new(alloc::vec![
6442 Value::Int(id),
6443 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
6444 ]))
6445 .unwrap();
6446 }
6447 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6448 let idx_pos = cat
6449 .get("vecs")
6450 .unwrap()
6451 .indices()
6452 .iter()
6453 .position(|i| i.name == "v_idx")
6454 .unwrap();
6455 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
6456 let table = cat.get("vecs").unwrap();
6457 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
6458 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
6459 .map(|i| {
6460 let Value::Vector(v) = &table.rows[i].values[1] else {
6461 return (f32::INFINITY, i);
6462 };
6463 (l2_distance_sq(v, &query), i)
6464 })
6465 .collect();
6466 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6467 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
6468 assert_eq!(
6469 hnsw_top[0].1, brute[0].1,
6470 "HNSW top-1 != brute-force top-1 for {query:?}"
6471 );
6472 }
6473 }
6474
6475 #[test]
6476 fn serialize_table_with_rows_round_trips() {
6477 let mut cat = Catalog::new();
6478 cat.create_table(make_users_schema()).unwrap();
6479 let t = cat.get_mut("users").unwrap();
6480 t.insert(Row::new(vec![
6481 Value::Int(1),
6482 Value::Text("alice".into()),
6483 Value::Float(95.5),
6484 ]))
6485 .unwrap();
6486 t.insert(Row::new(vec![
6487 Value::Int(2),
6488 Value::Text("bob".into()),
6489 Value::Null,
6490 ]))
6491 .unwrap();
6492 assert_round_trip(&cat);
6493 }
6494
6495 #[test]
6496 fn serialize_multiple_tables_round_trips() {
6497 let mut cat = Catalog::new();
6498 cat.create_table(make_users_schema()).unwrap();
6499 cat.create_table(TableSchema::new(
6500 "flags",
6501 vec![
6502 ColumnSchema::new("id", DataType::BigInt, false),
6503 ColumnSchema::new("active", DataType::Bool, false),
6504 ],
6505 ))
6506 .unwrap();
6507 cat.get_mut("flags")
6508 .unwrap()
6509 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6510 .unwrap();
6511 assert_round_trip(&cat);
6512 }
6513
6514 #[test]
6515 fn deserialize_rejects_bad_magic() {
6516 let mut buf = b"BADMAGIC".to_vec();
6517 buf.push(FILE_VERSION);
6518 buf.extend_from_slice(&0u32.to_le_bytes());
6519 let err = Catalog::deserialize(&buf).unwrap_err();
6520 assert!(matches!(err, StorageError::Corrupt(_)));
6521 }
6522
6523 #[test]
6524 fn deserialize_rejects_unsupported_version() {
6525 let mut buf = FILE_MAGIC.to_vec();
6526 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6528 let err = Catalog::deserialize(&buf).unwrap_err();
6529 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6530 }
6531
6532 #[test]
6533 fn deserialize_rejects_truncated_file() {
6534 let mut cat = Catalog::new();
6535 cat.create_table(make_users_schema()).unwrap();
6536 let bytes = cat.serialize();
6537 let truncated = &bytes[..bytes.len() - 1];
6539 assert!(matches!(
6540 Catalog::deserialize(truncated),
6541 Err(StorageError::Corrupt(_))
6542 ));
6543 }
6544
6545 #[test]
6546 fn deserialize_rejects_trailing_garbage() {
6547 let cat = Catalog::new();
6548 let mut bytes = cat.serialize();
6549 bytes.push(0xFF);
6550 assert!(matches!(
6551 Catalog::deserialize(&bytes),
6552 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6553 ));
6554 }
6555
6556 fn populated_users() -> Catalog {
6559 let mut cat = Catalog::new();
6560 cat.create_table(make_users_schema()).unwrap();
6561 let t = cat.get_mut("users").unwrap();
6562 for (id, name, score) in [
6563 (1, "alice", Some(90.0)),
6564 (2, "bob", None),
6565 (3, "alice", Some(70.0)), ] {
6567 t.insert(Row::new(vec![
6568 Value::Int(id),
6569 Value::Text(name.into()),
6570 score.map_or(Value::Null, Value::Float),
6571 ]))
6572 .unwrap();
6573 }
6574 cat
6575 }
6576
6577 #[test]
6578 fn add_index_builds_from_existing_rows() {
6579 let mut cat = populated_users();
6580 cat.get_mut("users")
6581 .unwrap()
6582 .add_index("by_id".into(), "id")
6583 .unwrap();
6584 let t = cat.get("users").unwrap();
6585 let idx = t.index_on(0).expect("index_on(0)");
6586 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6587 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6588 }
6589
6590 #[test]
6591 fn add_index_dup_name_rejected() {
6592 let mut cat = populated_users();
6593 let t = cat.get_mut("users").unwrap();
6594 t.add_index("ix".into(), "id").unwrap();
6595 let err = t.add_index("ix".into(), "name").unwrap_err();
6596 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6597 }
6598
6599 #[test]
6600 fn add_index_unknown_column_rejected() {
6601 let mut cat = populated_users();
6602 let err = cat
6603 .get_mut("users")
6604 .unwrap()
6605 .add_index("ix".into(), "ghost")
6606 .unwrap_err();
6607 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6608 }
6609
6610 #[test]
6611 fn insert_after_create_index_updates_it() {
6612 let mut cat = populated_users();
6613 let t = cat.get_mut("users").unwrap();
6614 t.add_index("by_name".into(), "name").unwrap();
6615 t.insert(Row::new(vec![
6616 Value::Int(4),
6617 Value::Text("dave".into()),
6618 Value::Null,
6619 ]))
6620 .unwrap();
6621 let idx = t.index_on(1).unwrap();
6622 assert_eq!(
6623 idx.lookup_eq(&IndexKey::Text("dave".into())),
6624 &[RowLocator::Hot(3)]
6625 );
6626 assert_eq!(
6628 idx.lookup_eq(&IndexKey::Text("alice".into())),
6629 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6630 );
6631 }
6632
6633 #[test]
6634 fn null_or_float_values_are_not_indexed() {
6635 let mut cat = populated_users();
6636 let t = cat.get_mut("users").unwrap();
6637 t.add_index("by_score".into(), "score").unwrap();
6638 let idx = t.index_on(2).unwrap();
6639 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6644 }
6645
6646 #[test]
6649 fn vector_value_data_type_carries_dim() {
6650 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6651 assert_eq!(
6652 v.data_type(),
6653 Some(DataType::Vector {
6654 dim: 3,
6655 encoding: VecEncoding::F32
6656 })
6657 );
6658 }
6659
6660 #[test]
6661 fn vector_column_insert_matching_dim_ok() {
6662 let mut cat = Catalog::new();
6663 cat.create_table(TableSchema::new(
6664 "emb",
6665 vec![ColumnSchema::new(
6666 "v",
6667 DataType::Vector {
6668 dim: 3,
6669 encoding: VecEncoding::F32,
6670 },
6671 false,
6672 )],
6673 ))
6674 .unwrap();
6675 cat.get_mut("emb")
6676 .unwrap()
6677 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6678 .unwrap();
6679 }
6680
6681 #[test]
6682 fn vector_column_insert_dim_mismatch_rejected() {
6683 let mut cat = Catalog::new();
6684 cat.create_table(TableSchema::new(
6685 "emb",
6686 vec![ColumnSchema::new(
6687 "v",
6688 DataType::Vector {
6689 dim: 3,
6690 encoding: VecEncoding::F32,
6691 },
6692 false,
6693 )],
6694 ))
6695 .unwrap();
6696 let err = cat
6697 .get_mut("emb")
6698 .unwrap()
6699 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6700 .unwrap_err();
6701 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6702 }
6703
6704 #[test]
6705 fn vector_value_survives_catalog_round_trip() {
6706 let mut cat = Catalog::new();
6707 cat.create_table(TableSchema::new(
6708 "emb",
6709 vec![
6710 ColumnSchema::new("id", DataType::Int, false),
6711 ColumnSchema::new(
6712 "v",
6713 DataType::Vector {
6714 dim: 4,
6715 encoding: VecEncoding::F32,
6716 },
6717 false,
6718 ),
6719 ],
6720 ))
6721 .unwrap();
6722 cat.get_mut("emb")
6723 .unwrap()
6724 .insert(Row::new(vec![
6725 Value::Int(1),
6726 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6727 ]))
6728 .unwrap();
6729 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6730 let table = restored.get("emb").unwrap();
6731 assert_eq!(
6732 table.schema().columns[1].ty,
6733 DataType::Vector {
6734 dim: 4,
6735 encoding: VecEncoding::F32
6736 }
6737 );
6738 assert_eq!(
6739 table.rows()[0].values[1],
6740 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6741 );
6742 }
6743
6744 #[test]
6745 fn index_survives_serialize_deserialize_round_trip() {
6746 let mut cat = populated_users();
6747 cat.get_mut("users")
6748 .unwrap()
6749 .add_index("by_name".into(), "name")
6750 .unwrap();
6751 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6752 let idx = restored
6753 .get("users")
6754 .unwrap()
6755 .index_on(1)
6756 .expect("index_on(1) after restore");
6757 assert_eq!(idx.name, "by_name");
6758 assert_eq!(
6760 idx.lookup_eq(&IndexKey::Text("alice".into())),
6761 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6762 );
6763 }
6764
6765 fn bigint_pk_users_schema() -> TableSchema {
6770 TableSchema::new(
6771 "users",
6772 vec![
6773 ColumnSchema::new("id", DataType::BigInt, false),
6774 ColumnSchema::new("name", DataType::Text, false),
6775 ],
6776 )
6777 }
6778
6779 fn make_user_row(id: i64, name: &str) -> Row {
6780 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6781 }
6782
6783 #[test]
6784 fn lookup_by_pk_finds_row_via_hot_index() {
6785 let mut cat = Catalog::new();
6786 cat.create_table(bigint_pk_users_schema()).unwrap();
6787 let t = cat.get_mut("users").unwrap();
6788 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6789 t.insert(make_user_row(id, name)).unwrap();
6790 }
6791 t.add_index("by_id".into(), "id").unwrap();
6792 let got = cat
6794 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6795 .unwrap();
6796 assert_eq!(got, make_user_row(2, "bob"));
6797 assert_eq!(cat.cold_segment_count(), 0);
6798 }
6799
6800 #[test]
6801 fn lookup_by_pk_returns_none_when_key_missing() {
6802 let mut cat = Catalog::new();
6803 cat.create_table(bigint_pk_users_schema()).unwrap();
6804 let t = cat.get_mut("users").unwrap();
6805 t.insert(make_user_row(1, "alice")).unwrap();
6806 t.add_index("by_id".into(), "id").unwrap();
6807 assert!(
6808 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6809 .is_none()
6810 );
6811 assert!(
6813 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6814 .is_none()
6815 );
6816 assert!(
6817 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6818 .is_none()
6819 );
6820 }
6821
6822 #[test]
6823 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6824 let mut cat = Catalog::new();
6828 cat.create_table(bigint_pk_users_schema()).unwrap();
6829 let t = cat.get_mut("users").unwrap();
6830 t.add_index("by_id".into(), "id").unwrap();
6831 let schema = t.schema.clone();
6832
6833 let cold_rows: Vec<(i64, &str)> =
6834 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6835 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6836 .iter()
6837 .map(|(id, name)| {
6838 let row = make_user_row(*id, name);
6839 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6840 })
6841 .collect();
6842 let (seg_bytes, _meta) =
6843 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6844 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6845 assert_eq!(seg_id, 0);
6846 assert_eq!(cat.cold_segment_count(), 1);
6847
6848 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6849 .iter()
6850 .map(|(id, _)| {
6851 (
6852 IndexKey::Int(*id),
6853 RowLocator::Cold {
6854 segment_id: seg_id,
6855 page_offset: 0,
6856 },
6857 )
6858 })
6859 .collect();
6860 let registered = cat
6861 .get_mut("users")
6862 .unwrap()
6863 .register_cold_locators("by_id", pairs)
6864 .unwrap();
6865 assert_eq!(registered, 4);
6866
6867 for (id, name) in &cold_rows {
6868 let got = cat
6869 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6870 .unwrap_or_else(|| panic!("cold key {id} not found"));
6871 assert_eq!(got, make_user_row(*id, name));
6872 }
6873 assert!(
6875 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6876 .is_none()
6877 );
6878 }
6879
6880 #[test]
6881 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6882 let mut cat = Catalog::new();
6886 cat.create_table(bigint_pk_users_schema()).unwrap();
6887 let t = cat.get_mut("users").unwrap();
6888 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6889 t.insert(make_user_row(id, name)).unwrap();
6890 }
6891 t.add_index("by_id".into(), "id").unwrap();
6892 let schema = t.schema.clone();
6893
6894 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6895 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6896 .iter()
6897 .map(|(id, name)| {
6898 let row = make_user_row(*id, name);
6899 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6900 })
6901 .collect();
6902 let (seg_bytes, _) =
6903 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6904 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6905 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6906 .iter()
6907 .map(|(id, _)| {
6908 (
6909 IndexKey::Int(*id),
6910 RowLocator::Cold {
6911 segment_id: seg_id,
6912 page_offset: 0,
6913 },
6914 )
6915 })
6916 .collect();
6917 cat.get_mut("users")
6918 .unwrap()
6919 .register_cold_locators("by_id", pairs)
6920 .unwrap();
6921
6922 assert_eq!(
6924 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6925 .unwrap(),
6926 make_user_row(1, "alice")
6927 );
6928 assert_eq!(
6929 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6930 .unwrap(),
6931 make_user_row(2, "bob")
6932 );
6933 assert_eq!(
6935 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6936 .unwrap(),
6937 make_user_row(100, "ivy")
6938 );
6939 assert_eq!(
6940 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6941 .unwrap(),
6942 make_user_row(200, "joe")
6943 );
6944 assert!(
6946 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6947 .is_none()
6948 );
6949 }
6950
6951 #[test]
6952 fn register_cold_locators_rejects_nsw_index() {
6953 let mut cat = Catalog::new();
6954 cat.create_table(TableSchema::new(
6955 "vecs",
6956 vec![
6957 ColumnSchema::new("id", DataType::Int, false),
6958 ColumnSchema::new(
6959 "v",
6960 DataType::Vector {
6961 dim: 4,
6962 encoding: VecEncoding::F32,
6963 },
6964 false,
6965 ),
6966 ],
6967 ))
6968 .unwrap();
6969 let t = cat.get_mut("vecs").unwrap();
6970 t.insert(Row::new(vec![
6971 Value::Int(1),
6972 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6973 ]))
6974 .unwrap();
6975 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6976 let err = t
6977 .register_cold_locators(
6978 "by_v",
6979 vec![(
6980 IndexKey::Int(1),
6981 RowLocator::Cold {
6982 segment_id: 0,
6983 page_offset: 0,
6984 },
6985 )],
6986 )
6987 .unwrap_err();
6988 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6991 }
6992
6993 #[test]
6994 fn load_segment_bytes_rejects_garbage() {
6995 let mut cat = Catalog::new();
6996 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6997 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6998 assert_eq!(cat.cold_segment_count(), 0);
7000 }
7001
7002 #[test]
7003 fn load_segment_bytes_returns_sequential_ids() {
7004 let mut cat = Catalog::new();
7005 cat.create_table(bigint_pk_users_schema()).unwrap();
7006 let schema = cat.get("users").unwrap().schema.clone();
7007 for batch in 0u32..3 {
7008 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
7009 .map(|i| {
7010 let id = u64::from(batch) * 100 + i;
7011 let row = make_user_row(id.cast_signed(), "x");
7012 (id, encode_row_body_dense(&row, &schema))
7013 })
7014 .collect();
7015 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7016 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
7017 }
7018 assert_eq!(cat.cold_segment_count(), 3);
7019 }
7020
7021 #[test]
7028 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
7029 let mut cat = populated_users();
7036 cat.get_mut("users")
7037 .unwrap()
7038 .add_index("by_name".into(), "name")
7039 .unwrap();
7040
7041 let v8_bytes = encode_as_v8(&cat);
7046 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
7047
7048 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
7049 let idx = restored
7050 .get("users")
7051 .unwrap()
7052 .index_on(1)
7053 .expect("index_on(1) after restore");
7054 assert_eq!(
7057 idx.lookup_eq(&IndexKey::Text("alice".into())),
7058 &[RowLocator::Hot(0), RowLocator::Hot(2)]
7059 );
7060 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
7062 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
7063 }
7064 }
7065
7066 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
7071 let mut out = Vec::with_capacity(64);
7072 out.extend_from_slice(FILE_MAGIC);
7073 out.push(8u8);
7074 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
7075 for t in &cat.tables {
7076 write_str(&mut out, &t.schema.name);
7077 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
7078 for c in &t.schema.columns {
7079 write_str(&mut out, &c.name);
7080 write_data_type(&mut out, c.ty);
7081 out.push(u8::from(c.nullable));
7082 match &c.default {
7083 None => out.push(0),
7084 Some(v) => {
7085 out.push(1);
7086 write_value(&mut out, v);
7087 }
7088 }
7089 out.push(u8::from(c.auto_increment));
7090 }
7091 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
7092 for row in &t.rows {
7093 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
7094 }
7095 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
7096 for idx in &t.indices {
7097 write_str(&mut out, &idx.name);
7098 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
7099 match &idx.kind {
7100 IndexKind::BTree(_) => out.push(0),
7103 IndexKind::Nsw(g) => {
7104 out.push(1);
7105 write_u16(&mut out, u16::try_from(g.m).unwrap());
7106 write_nsw_graph(&mut out, g);
7107 }
7108 IndexKind::Brin { .. } => panic!(
7111 "v8 catalog writer cannot serialise BRIN — \
7112 tests with BRIN indices must use the current writer"
7113 ),
7114 }
7115 }
7116 }
7117 out
7118 }
7119
7120 #[test]
7126 fn v9_catalog_round_trip_preserves_cold_locators() {
7127 let mut cat = Catalog::new();
7128 cat.create_table(bigint_pk_users_schema()).unwrap();
7129 let t = cat.get_mut("users").unwrap();
7130 for (id, name) in [(1i64, "alice"), (2, "bob")] {
7132 t.insert(make_user_row(id, name)).unwrap();
7133 }
7134 t.add_index("by_id".into(), "id").unwrap();
7135 let schema = t.schema.clone();
7136
7137 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
7139 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7140 .iter()
7141 .map(|(id, name)| {
7142 let row = make_user_row(*id, name);
7143 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7144 })
7145 .collect();
7146 let (seg_bytes, _) =
7147 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7148 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
7149 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7150 .iter()
7151 .map(|(id, _)| {
7152 (
7153 IndexKey::Int(*id),
7154 RowLocator::Cold {
7155 segment_id: seg_id,
7156 page_offset: 0,
7157 },
7158 )
7159 })
7160 .collect();
7161 cat.get_mut("users")
7162 .unwrap()
7163 .register_cold_locators("by_id", pairs)
7164 .unwrap();
7165
7166 let bytes = cat.serialize();
7168 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
7169 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
7170
7171 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
7178 assert_eq!(restored_seg_id, seg_id);
7179
7180 let idx = restored.get("users").unwrap().index_on(0).unwrap();
7181 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
7183 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7184 for (id, _) in &cold_rows {
7186 assert_eq!(
7187 idx.lookup_eq(&IndexKey::Int(*id)),
7188 &[RowLocator::Cold {
7189 segment_id: seg_id,
7190 page_offset: 0,
7191 }]
7192 );
7193 }
7194 assert_eq!(
7196 restored
7197 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7198 .unwrap(),
7199 make_user_row(2, "bob")
7200 );
7201 for (id, name) in &cold_rows {
7202 assert_eq!(
7203 restored
7204 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7205 .unwrap(),
7206 make_user_row(*id, name)
7207 );
7208 }
7209 }
7210
7211 #[test]
7218 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
7219 let schema = TableSchema::new(
7220 "wide",
7221 vec![
7222 ColumnSchema::new("a", DataType::SmallInt, true),
7223 ColumnSchema::new("b", DataType::Int, false),
7224 ColumnSchema::new("c", DataType::BigInt, false),
7225 ColumnSchema::new("d", DataType::Float, false),
7226 ColumnSchema::new("e", DataType::Bool, false),
7227 ColumnSchema::new("f", DataType::Text, false),
7228 ColumnSchema::new(
7229 "g",
7230 DataType::Vector {
7231 dim: 3,
7232 encoding: VecEncoding::F32,
7233 },
7234 false,
7235 ),
7236 ColumnSchema::new(
7237 "h",
7238 DataType::Numeric {
7239 precision: 18,
7240 scale: 2,
7241 },
7242 false,
7243 ),
7244 ColumnSchema::new("i", DataType::Date, false),
7245 ColumnSchema::new("j", DataType::Timestamp, false),
7246 ],
7247 );
7248 let cases: &[Row] = &[
7249 Row::new(vec![
7250 Value::SmallInt(7),
7251 Value::Int(42),
7252 Value::BigInt(1_000_000),
7253 Value::Float(1.5),
7254 Value::Bool(true),
7255 Value::Text("hello".into()),
7256 Value::Vector(vec![1.0, 2.0, 3.0]),
7257 Value::Numeric {
7258 scaled: 12345,
7259 scale: 2,
7260 },
7261 Value::Date(20_000),
7262 Value::Timestamp(1_700_000_000_000_000),
7263 ]),
7264 Row::new(vec![
7266 Value::Null,
7267 Value::Int(0),
7268 Value::BigInt(0),
7269 Value::Float(0.0),
7270 Value::Bool(false),
7271 Value::Text(String::new()),
7272 Value::Vector(vec![]),
7273 Value::Numeric {
7274 scaled: 0,
7275 scale: 2,
7276 },
7277 Value::Date(0),
7278 Value::Timestamp(0),
7279 ]),
7280 Row::new(vec![
7281 Value::SmallInt(-1),
7282 Value::Int(-1),
7283 Value::BigInt(-1),
7284 Value::Float(-0.5),
7285 Value::Bool(true),
7286 Value::Text("a much longer payload here".into()),
7287 Value::Vector(vec![0.1, 0.2, 0.3]),
7288 Value::Numeric {
7289 scaled: -999_999_999,
7290 scale: 2,
7291 },
7292 Value::Date(-1),
7293 Value::Timestamp(-1),
7294 ]),
7295 ];
7296 for row in cases {
7297 let actual = encode_row_body_dense(row, &schema).len();
7298 let fast = row_body_encoded_len(row, &schema);
7299 assert_eq!(actual, fast, "row {row:?}");
7300 }
7301 }
7302
7303 #[test]
7304 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
7305 let mut cat = Catalog::new();
7306 cat.create_table(bigint_pk_users_schema()).unwrap();
7307 let t = cat.get_mut("users").unwrap();
7308 assert_eq!(t.hot_bytes(), 0);
7309 let mut expected: u64 = 0;
7310 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7311 let row = make_user_row(id, name);
7312 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
7313 t.insert(row).unwrap();
7314 }
7315 assert_eq!(t.hot_bytes(), expected);
7316 assert_eq!(cat.hot_tier_bytes(), expected);
7317 }
7318
7319 #[test]
7320 fn hot_bytes_shrinks_on_delete() {
7321 let mut cat = Catalog::new();
7322 cat.create_table(bigint_pk_users_schema()).unwrap();
7323 let t = cat.get_mut("users").unwrap();
7324 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7325 t.insert(make_user_row(id, name)).unwrap();
7326 }
7327 let before = t.hot_bytes();
7328 let bob_row = make_user_row(2, "bob");
7330 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
7331 let removed = t.delete_rows(&[1]);
7332 assert_eq!(removed, 1);
7333 assert_eq!(t.hot_bytes(), before - bob_bytes);
7334 }
7335
7336 #[test]
7337 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
7338 let mut cat = Catalog::new();
7339 cat.create_table(bigint_pk_users_schema()).unwrap();
7340 let t = cat.get_mut("users").unwrap();
7341 t.insert(make_user_row(1, "alice")).unwrap();
7342 let after_insert = t.hot_bytes();
7343 let new_row = make_user_row(1, "alice-the-longer-name");
7346 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
7347 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
7348 t.update_row(0, new_row.values).unwrap();
7349 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
7350 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
7351 }
7352
7353 #[test]
7354 fn hot_bytes_round_trips_through_serialize_deserialize() {
7355 let mut cat = Catalog::new();
7356 cat.create_table(bigint_pk_users_schema()).unwrap();
7357 let t = cat.get_mut("users").unwrap();
7358 for i in 0..10 {
7359 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
7360 .unwrap();
7361 }
7362 let pre = cat.hot_tier_bytes();
7363 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7364 assert_eq!(restored.hot_tier_bytes(), pre);
7365 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
7366 }
7367
7368 #[test]
7375 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
7376 let mut cat = Catalog::new();
7377 cat.create_table(bigint_pk_users_schema()).unwrap();
7378 let t = cat.get_mut("users").unwrap();
7379 for id in 0..10i64 {
7380 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7381 .unwrap();
7382 }
7383 t.add_index("by_id".into(), "id").unwrap();
7384 let total_bytes_before = t.hot_bytes();
7385
7386 let report = cat
7387 .freeze_oldest_to_cold("users", "by_id", 6)
7388 .expect("freeze succeeds");
7389 assert_eq!(report.frozen_rows, 6);
7390 assert_eq!(report.segment_id, 0);
7391 assert!(report.bytes_freed > 0);
7392 assert!(!report.segment_bytes.is_empty());
7393
7394 let t = cat.get("users").unwrap();
7395 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
7396 assert_eq!(cat.cold_segment_count(), 1);
7397 assert_eq!(
7399 t.hot_bytes(),
7400 total_bytes_before - report.bytes_freed,
7401 "hot_bytes accounting matches FreezeReport"
7402 );
7403
7404 for id in 0..10i64 {
7407 let got = cat
7408 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7409 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
7410 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7411 }
7412 }
7413
7414 #[test]
7419 fn freeze_twice_preserves_prior_cold_locators() {
7420 let mut cat = Catalog::new();
7421 cat.create_table(bigint_pk_users_schema()).unwrap();
7422 let t = cat.get_mut("users").unwrap();
7423 for id in 0..12i64 {
7424 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7425 .unwrap();
7426 }
7427 t.add_index("by_id".into(), "id").unwrap();
7428
7429 cat.freeze_oldest_to_cold("users", "by_id", 4)
7430 .expect("first freeze ok");
7431 cat.freeze_oldest_to_cold("users", "by_id", 4)
7432 .expect("second freeze ok");
7433
7434 assert_eq!(cat.get("users").unwrap().row_count(), 4);
7435 assert_eq!(cat.cold_segment_count(), 2);
7436 for id in 0..12i64 {
7439 let got = cat
7440 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7441 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
7442 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7443 }
7444 }
7445
7446 #[test]
7449 fn freeze_oldest_to_cold_rejects_invalid_input() {
7450 let mut cat = Catalog::new();
7451 cat.create_table(bigint_pk_users_schema()).unwrap();
7452 let t = cat.get_mut("users").unwrap();
7453 for id in 0..3i64 {
7454 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7455 .unwrap();
7456 }
7457 t.add_index("by_id".into(), "id").unwrap();
7458
7459 assert!(matches!(
7461 cat.freeze_oldest_to_cold("users", "by_id", 0),
7462 Err(StorageError::Corrupt(_))
7463 ));
7464 assert!(matches!(
7466 cat.freeze_oldest_to_cold("missing", "by_id", 1),
7467 Err(StorageError::Corrupt(_))
7468 ));
7469 assert!(matches!(
7471 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
7472 Err(StorageError::Corrupt(_))
7473 ));
7474 assert!(matches!(
7476 cat.freeze_oldest_to_cold("users", "by_id", 999),
7477 Err(StorageError::Corrupt(_))
7478 ));
7479 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7481 assert_eq!(cat.cold_segment_count(), 0);
7482 }
7483
7484 #[test]
7487 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
7488 let mut cat = Catalog::new();
7489 cat.create_table(TableSchema::new(
7490 "by_name",
7491 vec![
7492 ColumnSchema::new("name", DataType::Text, false),
7493 ColumnSchema::new("payload", DataType::BigInt, false),
7494 ],
7495 ))
7496 .unwrap();
7497 let t = cat.get_mut("by_name").unwrap();
7498 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
7499 .unwrap();
7500 t.add_index("by_n".into(), "name").unwrap();
7501 let err = cat
7502 .freeze_oldest_to_cold("by_name", "by_n", 1)
7503 .expect_err("non-integer PK rejected");
7504 match err {
7505 StorageError::Corrupt(s) => assert!(
7506 s.contains("non-integer"),
7507 "error message names the constraint: {s}"
7508 ),
7509 other => panic!("expected Corrupt, got {other:?}"),
7510 }
7511 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7513 assert_eq!(cat.cold_segment_count(), 0);
7514 }
7515
7516 #[test]
7521 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7522 let mut cat = Catalog::new();
7523 cat.create_table(bigint_pk_users_schema()).unwrap();
7524 let t = cat.get_mut("users").unwrap();
7525 for id in 0..6i64 {
7526 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7527 .unwrap();
7528 }
7529 t.add_index("by_id".into(), "id").unwrap();
7530 t.add_index("by_name".into(), "name").unwrap();
7531
7532 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7533
7534 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7538 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7539 assert_eq!(got.len(), 1);
7540 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7541 match got[0] {
7542 RowLocator::Hot(i) => {
7543 assert_eq!(i, 1);
7546 }
7547 RowLocator::Cold { .. } => unreachable!(),
7548 }
7549 }
7550
7551 #[test]
7559 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7560 let mut cat = Catalog::new();
7561 cat.create_table(bigint_pk_users_schema()).unwrap();
7562 let t = cat.get_mut("users").unwrap();
7563 for id in 0..6i64 {
7564 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7565 .unwrap();
7566 }
7567 t.add_index("by_id".into(), "id").unwrap();
7568 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7571 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7572
7573 let new_idx = cat
7575 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7576 .expect("promote ok")
7577 .expect("PK 2 was cold");
7578 assert_eq!(
7579 new_idx, 2,
7580 "promoted row appended after the 2 surviving hot rows"
7581 );
7582
7583 let t = cat.get("users").unwrap();
7584 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7585 let row = make_user_row(2, "u-2");
7587 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7588 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7589
7590 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7593 assert_eq!(entries.len(), 1, "exactly one locator per key");
7594 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7595 assert_eq!(
7597 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7598 .unwrap(),
7599 row
7600 );
7601 assert_eq!(
7604 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7605 .unwrap(),
7606 make_user_row(0, "u-0")
7607 );
7608 }
7609
7610 #[test]
7614 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7615 let mut cat = Catalog::new();
7616 cat.create_table(bigint_pk_users_schema()).unwrap();
7617 let t = cat.get_mut("users").unwrap();
7618 t.insert(make_user_row(7, "alice")).unwrap();
7619 t.add_index("by_id".into(), "id").unwrap();
7620
7621 assert!(
7623 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7624 .unwrap()
7625 .is_none()
7626 );
7627 assert!(
7629 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7630 .unwrap()
7631 .is_none()
7632 );
7633 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7635 assert_eq!(cat.cold_segment_count(), 0);
7636 }
7637
7638 #[test]
7643 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7644 let mut cat = Catalog::new();
7645 cat.create_table(bigint_pk_users_schema()).unwrap();
7646 let t = cat.get_mut("users").unwrap();
7647 for id in 0..5i64 {
7648 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7649 .unwrap();
7650 }
7651 t.add_index("by_id".into(), "id").unwrap();
7652 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7653
7654 assert!(
7656 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7657 .is_some(),
7658 "frozen PK resolves before shadow"
7659 );
7660 let removed = cat
7661 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7662 .unwrap();
7663 assert_eq!(removed, 1, "exactly one cold locator retired");
7664
7665 assert!(
7668 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7669 .is_none(),
7670 "shadowed key no longer resolves"
7671 );
7672 assert_eq!(
7674 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7675 .unwrap(),
7676 make_user_row(0, "u-0")
7677 );
7678 assert_eq!(
7679 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7680 .unwrap(),
7681 make_user_row(2, "u-2")
7682 );
7683 }
7684
7685 #[test]
7690 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7691 let mut cat = Catalog::new();
7692 cat.create_table(bigint_pk_users_schema()).unwrap();
7693 let t = cat.get_mut("users").unwrap();
7694 t.insert(make_user_row(1, "alice")).unwrap();
7695 t.add_index("by_id".into(), "id").unwrap();
7696 assert_eq!(
7697 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7698 .unwrap(),
7699 0,
7700 "hot-only key drops no cold locators"
7701 );
7702 assert_eq!(
7703 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7704 .unwrap(),
7705 0,
7706 "absent key drops no cold locators"
7707 );
7708 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7709 }
7710
7711 #[test]
7713 fn promote_and_shadow_reject_invalid_inputs() {
7714 let mut cat = Catalog::new();
7715 cat.create_table(bigint_pk_users_schema()).unwrap();
7716 let t = cat.get_mut("users").unwrap();
7717 t.insert(make_user_row(1, "alice")).unwrap();
7718 t.add_index("by_id".into(), "id").unwrap();
7719
7720 assert!(matches!(
7722 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7723 Err(StorageError::Corrupt(_))
7724 ));
7725 assert!(matches!(
7726 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7727 Err(StorageError::Corrupt(_))
7728 ));
7729 assert!(matches!(
7731 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7732 Err(StorageError::Corrupt(_))
7733 ));
7734 assert!(matches!(
7735 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7736 Err(StorageError::Corrupt(_))
7737 ));
7738 }
7739
7740 #[test]
7747 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7748 let mut a = Catalog::new();
7749 let mut b = Catalog::new();
7750 for cat in [&mut a, &mut b] {
7751 cat.create_table(bigint_pk_users_schema()).unwrap();
7752 let t = cat.get_mut("users").unwrap();
7753 for id in 0..10i64 {
7754 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7755 .unwrap();
7756 }
7757 t.add_index("by_id".into(), "id").unwrap();
7758 }
7759 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7760 let slice = b
7761 .prepare_freeze_slice("users", "by_id", 0..6)
7762 .expect("prepare");
7763 let parallel = b
7764 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7765 .expect("commit");
7766 assert_eq!(single.segment_id, parallel.segment_id);
7767 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7768 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7769 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7770 for id in 0..10i64 {
7772 assert_eq!(
7773 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7774 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7775 "PK {id} differs after single vs slice freeze"
7776 );
7777 }
7778 }
7779
7780 #[test]
7785 fn commit_freeze_slices_two_slices_match_single_slice() {
7786 let mut a = Catalog::new();
7787 let mut b = Catalog::new();
7788 for cat in [&mut a, &mut b] {
7789 cat.create_table(bigint_pk_users_schema()).unwrap();
7790 let t = cat.get_mut("users").unwrap();
7791 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7794 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7795 .unwrap();
7796 }
7797 t.add_index("by_id".into(), "id").unwrap();
7798 }
7799 let single = a
7800 .prepare_freeze_slice("users", "by_id", 0..8)
7801 .expect("prepare");
7802 let one = a
7803 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7804 .expect("commit one");
7805 let s1 = b
7806 .prepare_freeze_slice("users", "by_id", 0..4)
7807 .expect("prepare s1");
7808 let s2 = b
7809 .prepare_freeze_slice("users", "by_id", 4..8)
7810 .expect("prepare s2");
7811 let two = b
7812 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7813 .expect("commit two");
7814 assert_eq!(one.segment_bytes, two.segment_bytes);
7815 assert_eq!(one.frozen_rows, two.frozen_rows);
7816 for id in 0..10i64 {
7819 assert_eq!(
7820 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7821 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7822 "PK {id} differs after one-slice vs two-slice freeze"
7823 );
7824 }
7825 }
7826
7827 #[test]
7829 fn commit_freeze_slices_rejects_gap() {
7830 let mut cat = Catalog::new();
7831 cat.create_table(bigint_pk_users_schema()).unwrap();
7832 let t = cat.get_mut("users").unwrap();
7833 for id in 0..6i64 {
7834 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7835 .unwrap();
7836 }
7837 t.add_index("by_id".into(), "id").unwrap();
7838 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7839 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7840 assert!(matches!(
7841 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7842 Err(StorageError::Corrupt(_))
7843 ));
7844 assert_eq!(cat.cold_segment_count(), 0);
7846 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7847 }
7848
7849 #[test]
7851 fn commit_freeze_slices_empty_is_noop() {
7852 let mut cat = Catalog::new();
7853 cat.create_table(bigint_pk_users_schema()).unwrap();
7854 let t = cat.get_mut("users").unwrap();
7855 for id in 0..3i64 {
7856 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7857 .unwrap();
7858 }
7859 t.add_index("by_id".into(), "id").unwrap();
7860 let report = cat
7861 .commit_freeze_slices("users", "by_id", Vec::new())
7862 .unwrap();
7863 assert_eq!(report.frozen_rows, 0);
7864 assert_eq!(cat.cold_segment_count(), 0);
7865 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7866 }
7867
7868 #[test]
7875 fn compact_merges_small_segments_storage_unit() {
7876 let mut cat = Catalog::new();
7877 cat.create_table(bigint_pk_users_schema()).unwrap();
7878 let t = cat.get_mut("users").unwrap();
7879 for id in 0..8i64 {
7880 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7881 .unwrap();
7882 }
7883 t.add_index("by_id".into(), "id").unwrap();
7884 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7886 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7887 assert_eq!(cat.cold_segment_count(), 2);
7888 assert_eq!(cat.cold_segment_slot_count(), 2);
7889
7890 let max_seg_bytes = cat
7893 .cold_segment_ids_global()
7894 .iter()
7895 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7896 .max()
7897 .unwrap();
7898 let target = max_seg_bytes + 1;
7899
7900 let report = cat
7901 .compact_cold_segments("users", "by_id", target)
7902 .expect("compact succeeds");
7903 assert_eq!(report.sources.len(), 2);
7904 let merged_id = report.merged_segment_id.expect("merge happened");
7905 assert_eq!(report.merged_rows, 6);
7906 assert_eq!(report.deleted_rows_pruned, 0);
7907 assert!(!report.merged_segment_bytes.is_empty());
7908
7909 assert_eq!(cat.cold_segment_count(), 1);
7912 assert_eq!(cat.cold_segment_slot_count(), 3);
7913 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7914
7915 for id in 0..8i64 {
7918 let got = cat
7919 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7920 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7921 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7922 }
7923 }
7924
7925 #[test]
7929 fn compact_drops_shadowed_cold_rows() {
7930 let mut cat = Catalog::new();
7931 cat.create_table(bigint_pk_users_schema()).unwrap();
7932 let t = cat.get_mut("users").unwrap();
7933 for id in 0..6i64 {
7934 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7935 .unwrap();
7936 }
7937 t.add_index("by_id".into(), "id").unwrap();
7938 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7939 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7940 assert_eq!(
7942 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7943 .unwrap(),
7944 1
7945 );
7946 assert_eq!(
7947 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7948 .unwrap(),
7949 1
7950 );
7951
7952 let max_seg_bytes = cat
7953 .cold_segment_ids_global()
7954 .iter()
7955 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7956 .max()
7957 .unwrap();
7958 let report = cat
7959 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7960 .expect("compact succeeds");
7961 assert_eq!(report.sources.len(), 2);
7962 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7963 assert_eq!(report.deleted_rows_pruned, 2);
7964
7965 for shadowed in [1i64, 4i64] {
7967 assert!(
7968 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7969 .is_none(),
7970 "shadowed PK {shadowed} must remain invisible after compact"
7971 );
7972 }
7973 for live in [0i64, 2, 3, 5] {
7975 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7976 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7977 }
7978 }
7979
7980 #[test]
7983 fn compact_is_noop_below_two_candidates() {
7984 let mut cat = Catalog::new();
7985 cat.create_table(bigint_pk_users_schema()).unwrap();
7986 let t = cat.get_mut("users").unwrap();
7987 for id in 0..6i64 {
7988 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7989 .unwrap();
7990 }
7991 t.add_index("by_id".into(), "id").unwrap();
7992 let report = cat
7994 .compact_cold_segments("users", "by_id", 1 << 30)
7995 .expect("noop ok");
7996 assert!(report.merged_segment_id.is_none());
7997 assert!(report.sources.is_empty());
7998
7999 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
8001 let report = cat
8002 .compact_cold_segments("users", "by_id", 1 << 30)
8003 .expect("noop ok");
8004 assert!(report.merged_segment_id.is_none());
8005 assert_eq!(cat.cold_segment_count(), 1);
8006
8007 let report = cat
8010 .compact_cold_segments("users", "by_id", 1)
8011 .expect("noop ok");
8012 assert!(report.merged_segment_id.is_none());
8013 assert_eq!(cat.cold_segment_count(), 1);
8014 }
8015
8016 #[test]
8024 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
8025 let mut cat = Catalog::new();
8026 cat.create_table(bigint_pk_users_schema()).unwrap();
8027 let t = cat.get_mut("users").unwrap();
8028 for id in 0..6i64 {
8029 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8030 .unwrap();
8031 }
8032 t.add_index("by_id".into(), "id").unwrap();
8033 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8034 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
8035 let max_seg_bytes = cat
8036 .cold_segment_ids_global()
8037 .iter()
8038 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
8039 .max()
8040 .unwrap();
8041 let report = cat
8042 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
8043 .expect("compact ok");
8044 let merged_id = report.merged_segment_id.unwrap();
8045
8046 let cat_bytes = cat.serialize();
8051 let merged_bytes = report.merged_segment_bytes.clone();
8052
8053 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
8054 restored
8055 .load_segment_bytes_at(merged_id, merged_bytes)
8056 .expect("reload merged ok");
8057
8058 for id in 0..6i64 {
8060 let got = restored
8061 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8062 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
8063 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
8064 }
8065 assert_eq!(restored.cold_segment_count(), 1);
8068 }
8069
8070 #[test]
8073 fn load_segment_bytes_at_pads_and_rejects_collision() {
8074 let mut cat = Catalog::new();
8075 cat.create_table(bigint_pk_users_schema()).unwrap();
8076 let t = cat.get_mut("users").unwrap();
8077 for id in 0..4i64 {
8078 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8079 .unwrap();
8080 }
8081 t.add_index("by_id".into(), "id").unwrap();
8082 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
8083 let bytes_seg0 = report.segment_bytes.clone();
8084
8085 cat.load_segment_bytes_at(5, bytes_seg0.clone())
8089 .expect("pad + load ok");
8090 assert_eq!(cat.cold_segment_slot_count(), 6);
8091 assert_eq!(cat.cold_segment_count(), 2);
8092
8093 assert!(matches!(
8095 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
8096 Err(StorageError::Corrupt(_))
8097 ));
8098 assert!(matches!(
8100 cat.load_segment_bytes_at(0, bytes_seg0),
8101 Err(StorageError::Corrupt(_))
8102 ));
8103 }
8104
8105 #[test]
8109 fn promote_then_refreeze_does_not_leave_orphan_locators() {
8110 let mut cat = Catalog::new();
8111 cat.create_table(bigint_pk_users_schema()).unwrap();
8112 let t = cat.get_mut("users").unwrap();
8113 for id in 0..4i64 {
8114 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
8115 .unwrap();
8116 }
8117 t.add_index("by_id".into(), "id").unwrap();
8118
8119 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
8121 let promoted = cat
8122 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
8123 .unwrap();
8124 assert!(promoted.is_some());
8125 let entries_after_promote = cat
8126 .get("users")
8127 .unwrap()
8128 .index_on(0)
8129 .unwrap()
8130 .lookup_eq(&IndexKey::Int(0))
8131 .to_vec();
8132 assert_eq!(entries_after_promote.len(), 1);
8133 assert!(entries_after_promote[0].is_hot());
8134
8135 for id in [2i64, 3] {
8142 assert_eq!(
8143 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8144 .unwrap(),
8145 make_user_row(id, &alloc::format!("u-{id}"))
8146 );
8147 }
8148 }
8149}