1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Interval,
119 Json,
125}
126
127impl fmt::Display for DataType {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 match self {
130 Self::SmallInt => f.write_str("SMALLINT"),
131 Self::Int => f.write_str("INT"),
132 Self::BigInt => f.write_str("BIGINT"),
133 Self::Float => f.write_str("FLOAT"),
134 Self::Text => f.write_str("TEXT"),
135 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
136 Self::Char(n) => write!(f, "CHAR({n})"),
137 Self::Bool => f.write_str("BOOL"),
138 Self::Vector { dim, encoding } => match encoding {
139 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
140 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
141 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
142 },
143 Self::Numeric { precision, scale } => {
144 if *scale == 0 {
145 write!(f, "NUMERIC({precision})")
146 } else {
147 write!(f, "NUMERIC({precision}, {scale})")
148 }
149 }
150 Self::Date => f.write_str("DATE"),
151 Self::Timestamp => f.write_str("TIMESTAMP"),
152 Self::Interval => f.write_str("INTERVAL"),
153 Self::Json => f.write_str("JSON"),
154 }
155 }
156}
157
158#[derive(Debug, Clone, PartialEq)]
162#[non_exhaustive]
163pub enum Value {
164 SmallInt(i16),
165 Int(i32),
166 BigInt(i64),
167 Float(f64),
168 Text(String),
169 Bool(bool),
170 Vector(Vec<f32>),
171 Sq8Vector(crate::quantize::Sq8Vector),
178 HalfVector(crate::halfvec::HalfVector),
184 Numeric {
188 scaled: i128,
189 scale: u8,
190 },
191 Date(i32),
193 Timestamp(i64),
195 Interval {
198 months: i32,
199 micros: i64,
200 },
201 Json(String),
205 Null,
206}
207
208impl Value {
209 pub fn data_type(&self) -> Option<DataType> {
211 match self {
212 Self::SmallInt(_) => Some(DataType::SmallInt),
213 Self::Int(_) => Some(DataType::Int),
214 Self::BigInt(_) => Some(DataType::BigInt),
215 Self::Float(_) => Some(DataType::Float),
216 Self::Text(_) => Some(DataType::Text),
219 Self::Bool(_) => Some(DataType::Bool),
220 Self::Vector(v) => Some(DataType::Vector {
221 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
222 encoding: VecEncoding::F32,
223 }),
224 Self::Sq8Vector(q) => Some(DataType::Vector {
225 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
226 encoding: VecEncoding::Sq8,
227 }),
228 Self::HalfVector(h) => Some(DataType::Vector {
229 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
230 encoding: VecEncoding::F16,
231 }),
232 Self::Numeric { scale, .. } => Some(DataType::Numeric {
237 precision: 0,
238 scale: *scale,
239 }),
240 Self::Date(_) => Some(DataType::Date),
241 Self::Timestamp(_) => Some(DataType::Timestamp),
242 Self::Interval { .. } => Some(DataType::Interval),
243 Self::Json(_) => Some(DataType::Json),
244 Self::Null => None,
245 }
246 }
247
248 pub const fn is_null(&self) -> bool {
249 matches!(self, Self::Null)
250 }
251}
252
253#[derive(Debug, Clone, PartialEq)]
256pub struct Row {
257 pub values: Vec<Value>,
258}
259
260impl Row {
261 pub const fn new(values: Vec<Value>) -> Self {
262 Self { values }
263 }
264
265 pub fn len(&self) -> usize {
266 self.values.len()
267 }
268
269 pub fn is_empty(&self) -> bool {
270 self.values.is_empty()
271 }
272}
273
274#[derive(Debug, Clone, PartialEq)]
275pub struct ColumnSchema {
276 pub name: String,
277 pub ty: DataType,
278 pub nullable: bool,
279 pub default: Option<Value>,
283 pub auto_increment: bool,
287}
288
289#[derive(Debug, Clone, PartialEq)]
290pub struct TableSchema {
291 pub name: String,
292 pub columns: Vec<ColumnSchema>,
293 pub hot_tier_bytes: Option<u64>,
299 pub foreign_keys: Vec<ForeignKeyConstraint>,
306}
307
308#[derive(Debug, Clone, PartialEq, Eq)]
313pub struct ForeignKeyConstraint {
314 pub name: Option<String>,
318 pub local_columns: Vec<usize>,
321 pub parent_table: String,
323 pub parent_columns: Vec<usize>,
328 pub on_delete: FkAction,
330 pub on_update: FkAction,
333}
334
335#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337pub enum FkAction {
338 Restrict,
339 Cascade,
340 SetNull,
341 SetDefault,
342 NoAction,
343}
344
345impl FkAction {
346 pub const fn tag(self) -> u8 {
348 match self {
349 Self::Restrict => 0,
350 Self::Cascade => 1,
351 Self::SetNull => 2,
352 Self::SetDefault => 3,
353 Self::NoAction => 4,
354 }
355 }
356 pub const fn from_tag(b: u8) -> Option<Self> {
357 Some(match b {
358 0 => Self::Restrict,
359 1 => Self::Cascade,
360 2 => Self::SetNull,
361 3 => Self::SetDefault,
362 4 => Self::NoAction,
363 _ => return None,
364 })
365 }
366}
367
368impl TableSchema {
369 pub fn column_position(&self, name: &str) -> Option<usize> {
370 self.columns.iter().position(|c| c.name == name)
371 }
372}
373
374#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
379pub enum IndexKey {
380 Int(i64),
381 Text(String),
382 Bool(bool),
383}
384
385impl IndexKey {
386 pub fn from_value(v: &Value) -> Option<Self> {
387 match v {
388 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
389 Value::Int(n) => Some(Self::Int(i64::from(*n))),
390 Value::BigInt(n) => Some(Self::Int(*n)),
391 Value::Text(s) => Some(Self::Text(s.clone())),
392 Value::Bool(b) => Some(Self::Bool(*b)),
393 Value::Date(d) => Some(Self::Int(i64::from(*d))),
396 Value::Timestamp(t) => Some(Self::Int(*t)),
397 Value::Null
402 | Value::Float(_)
403 | Value::Vector(_)
404 | Value::Sq8Vector(_)
405 | Value::HalfVector(_)
406 | Value::Numeric { .. }
407 | Value::Interval { .. }
408 | Value::Json(_) => None,
409 }
410 }
411}
412
413#[derive(Debug, Clone)]
418pub struct Index {
419 pub name: String,
420 pub column_position: usize,
421 pub kind: IndexKind,
422 pub included_columns: Vec<usize>,
432 pub partial_predicate: Option<String>,
439 pub expression: Option<String>,
444}
445
446pub const NSW_DEFAULT_M: usize = 16;
449
450#[derive(Debug, Clone)]
458pub struct FreezeReport {
459 pub segment_id: u32,
462 pub frozen_rows: usize,
465 pub bytes_freed: u64,
469 pub segment_bytes: Vec<u8>,
474}
475
476#[derive(Debug, Clone)]
485pub struct FreezeSlice {
486 pub row_range: core::ops::Range<usize>,
491 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
497}
498
499#[derive(Debug, Clone)]
515pub struct CompactReport {
516 pub sources: Vec<u32>,
518 pub merged_segment_id: Option<u32>,
520 pub merged_segment_bytes: Vec<u8>,
522 pub merged_rows: usize,
524 pub deleted_rows_pruned: usize,
529 pub bytes_reclaimed_estimate: u64,
533}
534
535#[derive(Debug, Clone)]
536pub enum IndexKind {
537 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
554 Nsw(NswGraph),
556 Brin {
563 column_type: DataType,
567 },
568}
569
570#[derive(Debug, Clone)]
579pub struct NswGraph {
580 pub m: usize,
582 pub m_max_0: usize,
585 pub entry: Option<usize>,
588 pub entry_level: u8,
590 pub levels: PersistentVec<u8>,
597 pub layers: Vec<PersistentVec<Vec<u32>>>,
613}
614
615impl NswGraph {
616 fn new(m: usize) -> Self {
617 Self {
618 m,
619 m_max_0: m.saturating_mul(2),
620 entry: None,
621 entry_level: 0,
622 levels: PersistentVec::new(),
623 layers: alloc::vec![PersistentVec::new()],
624 }
625 }
626
627 pub const fn cap_for_layer(&self, layer: u8) -> usize {
629 if layer == 0 { self.m_max_0 } else { self.m }
630 }
631}
632
633#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
640 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
643 x ^= x >> 30;
644 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
645 x ^= x >> 27;
646 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
647 x ^= x >> 31;
648 let mut level: u8 = 0;
653 while x & 0xF == 0 && level < MAX_LEVEL {
654 level += 1;
655 x >>= 4;
656 }
657 level
658}
659
660impl Index {
661 fn new_btree(name: String, column_position: usize) -> Self {
662 Self {
663 name,
664 column_position,
665 kind: IndexKind::BTree(PersistentBTreeMap::new()),
666 included_columns: Vec::new(),
667 partial_predicate: None,
668 expression: None,
669 }
670 }
671
672 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
673 Self {
674 name,
675 column_position,
676 kind: IndexKind::Nsw(NswGraph::new(m)),
677 included_columns: Vec::new(),
678 partial_predicate: None,
679 expression: None,
680 }
681 }
682
683 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
687 Self {
688 name,
689 column_position,
690 kind: IndexKind::Brin { column_type },
691 included_columns: Vec::new(),
692 partial_predicate: None,
693 expression: None,
694 }
695 }
696
697 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
706 match &self.kind {
707 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
708 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
710 }
711 }
712
713 pub const fn nsw(&self) -> Option<&NswGraph> {
716 match &self.kind {
717 IndexKind::Nsw(g) => Some(g),
718 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
719 }
720 }
721
722 pub const fn is_brin(&self) -> bool {
727 matches!(self.kind, IndexKind::Brin { .. })
728 }
729}
730
731#[derive(Debug, Clone)]
747pub struct Table {
748 schema: TableSchema,
749 rows: PersistentVec<Row>,
750 indices: Vec<Index>,
751 hot_bytes: u64,
752 cold_row_count: u64,
766 cold_row_count_stale: bool,
771}
772
773impl Table {
774 pub fn new(schema: TableSchema) -> Self {
775 Self {
776 schema,
777 rows: PersistentVec::new(),
778 indices: Vec::new(),
779 hot_bytes: 0,
780 cold_row_count: 0,
781 cold_row_count_stale: false,
782 }
783 }
784
785 #[must_use]
789 pub const fn hot_bytes(&self) -> u64 {
790 self.hot_bytes
791 }
792
793 #[must_use]
796 pub const fn cold_row_count(&self) -> u64 {
797 self.cold_row_count
798 }
799
800 pub fn set_cold_row_count(&mut self, n: u64) {
803 self.cold_row_count = n;
804 self.cold_row_count_stale = false;
805 }
806
807 pub fn mark_cold_row_count_stale(&mut self) {
812 self.cold_row_count_stale = true;
813 }
814
815 #[must_use]
819 pub const fn cold_row_count_stale(&self) -> bool {
820 self.cold_row_count_stale
821 }
822
823 #[must_use]
834 pub fn count_cold_locators(&self) -> u64 {
835 let mut best: u64 = 0;
836 for idx in &self.indices {
837 if let IndexKind::BTree(map) = &idx.kind {
838 let n: u64 = map
839 .iter()
840 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
841 .sum();
842 if n > best {
843 best = n;
844 }
845 }
846 }
847 best
848 }
849
850 pub const fn schema(&self) -> &TableSchema {
851 &self.schema
852 }
853
854 pub const fn schema_mut(&mut self) -> &mut TableSchema {
858 &mut self.schema
859 }
860
861 pub const fn rows(&self) -> &PersistentVec<Row> {
865 &self.rows
866 }
867
868 pub const fn row_count(&self) -> usize {
869 self.rows.len()
870 }
871
872 pub fn indices_mut(&mut self) -> &mut [Index] {
877 &mut self.indices
878 }
879
880 pub fn indices(&self) -> &[Index] {
881 &self.indices
882 }
883
884 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
890 let ty = self.schema.columns.get(col_pos)?.ty;
891 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
892 return None;
893 }
894 let mut max: Option<i64> = None;
895 for row in &self.rows {
896 match row.values.get(col_pos) {
897 Some(Value::SmallInt(n)) => {
898 let v = i64::from(*n);
899 max = Some(max.map_or(v, |m| m.max(v)));
900 }
901 Some(Value::Int(n)) => {
902 let v = i64::from(*n);
903 max = Some(max.map_or(v, |m| m.max(v)));
904 }
905 Some(Value::BigInt(n)) => {
906 max = Some(max.map_or(*n, |m| m.max(*n)));
907 }
908 _ => {}
909 }
910 }
911 Some(max.map_or(1, |m| m + 1))
912 }
913
914 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
918 self.indices
925 .iter()
926 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
927 .or_else(|| {
928 self.indices
929 .iter()
930 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
931 })
932 }
933
934 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
938 if row.len() != self.schema.columns.len() {
939 return Err(StorageError::ArityMismatch {
940 expected: self.schema.columns.len(),
941 actual: row.len(),
942 });
943 }
944 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
945 if val.is_null() {
946 if !col.nullable {
947 return Err(StorageError::NullInNotNull {
948 column: col.name.clone(),
949 });
950 }
951 continue;
952 }
953 let actual = val.data_type().expect("non-null");
954 let compatible = actual == col.ty
968 || matches!(
969 (actual, col.ty),
970 (
971 DataType::Text,
972 DataType::Varchar(_) | DataType::Char(_) | DataType::Json
973 ) | (DataType::Json, DataType::Text)
974 )
975 || matches!(
976 (actual, col.ty),
977 (
978 DataType::Numeric { scale: a, .. },
979 DataType::Numeric { scale: b, .. },
980 ) if a == b
981 );
982 if !compatible {
983 return Err(StorageError::TypeMismatch {
984 column: col.name.clone(),
985 expected: col.ty,
986 actual,
987 position: i,
988 });
989 }
990 }
991 let new_row_idx = self.rows.len();
992 for idx in &mut self.indices {
996 if let IndexKind::BTree(map) = &mut idx.kind
997 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
998 {
999 let mut entries = map.get(&key).cloned().unwrap_or_default();
1005 entries.push(RowLocator::Hot(new_row_idx));
1006 map.insert_mut(key, entries);
1007 }
1008 }
1009 self.hot_bytes = self
1012 .hot_bytes
1013 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1014 self.rows.push_mut(row);
1019 let new_row_idx = self.rows.len() - 1;
1022 let nsw_targets: Vec<usize> = self
1023 .indices
1024 .iter()
1025 .enumerate()
1026 .filter_map(|(i, idx)| {
1027 if matches!(idx.kind, IndexKind::Nsw(_)) {
1028 Some(i)
1029 } else {
1030 None
1031 }
1032 })
1033 .collect();
1034 for idx_pos in nsw_targets {
1035 nsw_insert_at(self, idx_pos, new_row_idx);
1036 }
1037 Ok(())
1038 }
1039
1040 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1044 if self.indices.iter().any(|i| i.name == name) {
1045 return Err(StorageError::DuplicateIndex { name });
1046 }
1047 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1048 StorageError::ColumnNotFound {
1049 column: column_name.into(),
1050 }
1051 })?;
1052 let mut idx = Index::new_btree(name, column_position);
1053 if let IndexKind::BTree(map) = &mut idx.kind {
1054 for (i, row) in self.rows.iter().enumerate() {
1055 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1056 let mut entries = map.get(&key).cloned().unwrap_or_default();
1057 entries.push(RowLocator::Hot(i));
1058 map.insert_mut(key, entries);
1059 }
1060 }
1061 }
1062 self.indices.push(idx);
1063 Ok(())
1064 }
1065
1066 pub fn add_nsw_index(
1071 &mut self,
1072 name: String,
1073 column_name: &str,
1074 m: usize,
1075 ) -> Result<(), StorageError> {
1076 self.add_nsw_index_inner(name, column_name, m, None)
1077 }
1078
1079 pub fn rebuild_nsw_index(
1091 &mut self,
1092 name: &str,
1093 new_encoding: Option<VecEncoding>,
1094 ) -> Result<(), StorageError> {
1095 let idx_pos = self
1096 .indices
1097 .iter()
1098 .position(|i| i.name == name)
1099 .ok_or_else(|| StorageError::IndexNotFound {
1100 name: String::from(name),
1101 })?;
1102 let col_pos = self.indices[idx_pos].column_position;
1103 let m = match &self.indices[idx_pos].kind {
1104 IndexKind::Nsw(g) => g.m,
1105 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1106 return Err(StorageError::Unsupported(format!(
1107 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1108 )));
1109 }
1110 };
1111 let col_name = self.schema.columns[col_pos].name.clone();
1112 if let Some(target) = new_encoding {
1115 let current = match self.schema.columns[col_pos].ty {
1116 DataType::Vector { encoding, .. } => encoding,
1117 ref other => {
1118 return Err(StorageError::Unsupported(format!(
1119 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1120 )));
1121 }
1122 };
1123 if target != current {
1124 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1125 unreachable!("checked above")
1126 };
1127 let n = self.rows.len();
1128 for i in 0..n {
1129 let row = self
1130 .rows
1131 .get_mut(i)
1132 .expect("row index in bounds (we iterated up to len())");
1133 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1134 let recoded = recode_vector_cell(cell, target)?;
1135 row.values[col_pos] = recoded;
1136 }
1137 self.schema.columns[col_pos].ty = DataType::Vector {
1138 dim,
1139 encoding: target,
1140 };
1141 }
1142 }
1143 self.indices.remove(idx_pos);
1145 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1146 Ok(())
1147 }
1148
1149 pub fn restore_nsw_index(
1154 &mut self,
1155 name: String,
1156 column_name: &str,
1157 graph: NswGraph,
1158 ) -> Result<(), StorageError> {
1159 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1160 }
1161
1162 pub fn restore_btree_index(
1169 &mut self,
1170 name: String,
1171 column_name: &str,
1172 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1173 ) -> Result<(), StorageError> {
1174 if self.indices.iter().any(|i| i.name == name) {
1175 return Err(StorageError::DuplicateIndex { name });
1176 }
1177 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1178 StorageError::ColumnNotFound {
1179 column: column_name.into(),
1180 }
1181 })?;
1182 self.indices.push(Index {
1183 name,
1184 column_position,
1185 kind: IndexKind::BTree(map),
1186 included_columns: Vec::new(),
1187 partial_predicate: None,
1188 expression: None,
1189 });
1190 Ok(())
1191 }
1192
1193 pub fn restore_brin_index(
1198 &mut self,
1199 name: String,
1200 column_name: &str,
1201 column_type: DataType,
1202 ) -> Result<(), StorageError> {
1203 if self.indices.iter().any(|i| i.name == name) {
1204 return Err(StorageError::DuplicateIndex { name });
1205 }
1206 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1207 StorageError::ColumnNotFound {
1208 column: column_name.into(),
1209 }
1210 })?;
1211 self.indices.push(Index::new_brin(name, column_position, column_type));
1212 Ok(())
1213 }
1214
1215 pub fn add_brin_index(
1219 &mut self,
1220 name: String,
1221 column_name: &str,
1222 ) -> Result<(), StorageError> {
1223 if self.indices.iter().any(|i| i.name == name) {
1224 return Err(StorageError::DuplicateIndex { name });
1225 }
1226 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1227 StorageError::ColumnNotFound {
1228 column: column_name.into(),
1229 }
1230 })?;
1231 let column_type = self.schema.columns[column_position].ty;
1232 self.indices.push(Index::new_brin(name, column_position, column_type));
1233 Ok(())
1234 }
1235
1236 pub fn register_cold_locators<I>(
1253 &mut self,
1254 index_name: &str,
1255 locators: I,
1256 ) -> Result<usize, StorageError>
1257 where
1258 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1259 {
1260 let idx = self
1261 .indices
1262 .iter_mut()
1263 .find(|i| i.name == index_name)
1264 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1265 let map = match &mut idx.kind {
1266 IndexKind::BTree(map) => map,
1267 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1268 return Err(StorageError::Corrupt(format!(
1269 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1270 )));
1271 }
1272 };
1273 let mut count = 0usize;
1274 for (key, locator) in locators {
1275 let mut entries = map.get(&key).cloned().unwrap_or_default();
1276 entries.push(locator);
1277 map.insert_mut(key, entries);
1278 count += 1;
1279 }
1280 Ok(count)
1281 }
1282
1283 pub fn remove_cold_locators_for_key(
1293 &mut self,
1294 index_name: &str,
1295 key: &IndexKey,
1296 ) -> Result<usize, StorageError> {
1297 let idx = self
1298 .indices
1299 .iter_mut()
1300 .find(|i| i.name == index_name)
1301 .ok_or_else(|| {
1302 StorageError::Corrupt(format!(
1303 "remove_cold_locators_for_key: index {index_name:?} not found"
1304 ))
1305 })?;
1306 let map = match &mut idx.kind {
1307 IndexKind::BTree(map) => map,
1308 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1309 return Err(StorageError::Corrupt(format!(
1310 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1311 cold locators apply only to BTree indices"
1312 )));
1313 }
1314 };
1315 let Some(entries) = map.get(key) else {
1316 return Ok(0);
1317 };
1318 let mut kept: Vec<RowLocator> =
1319 entries.iter().copied().filter(RowLocator::is_hot).collect();
1320 let removed = entries.len() - kept.len();
1321 if removed == 0 {
1322 return Ok(0);
1323 }
1324 kept.shrink_to_fit();
1325 map.insert_mut(key.clone(), kept);
1333 Ok(removed)
1334 }
1335
1336 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1342 if positions.is_empty() {
1343 return 0;
1344 }
1345 let mut to_remove = alloc::vec![false; self.rows.len()];
1349 let mut removed = 0;
1350 for &p in positions {
1351 if p < to_remove.len() && !to_remove[p] {
1352 to_remove[p] = true;
1353 removed += 1;
1354 }
1355 }
1356 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1357 let mut removed_bytes: u64 = 0;
1358 for (i, row) in self.rows.iter().enumerate() {
1359 if to_remove[i] {
1360 removed_bytes =
1361 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1362 } else {
1363 new_rows.push_mut(row.clone());
1364 }
1365 }
1366 self.rows = new_rows;
1367 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1368 self.rebuild_indices();
1369 removed
1370 }
1371
1372 pub fn update_row(
1378 &mut self,
1379 position: usize,
1380 new_values: Vec<Value>,
1381 ) -> Result<(), StorageError> {
1382 if position >= self.rows.len() {
1383 return Err(StorageError::Corrupt(alloc::format!(
1384 "update_row: position {position} out of bounds (rows={})",
1385 self.rows.len()
1386 )));
1387 }
1388 if new_values.len() != self.schema.columns.len() {
1389 return Err(StorageError::ArityMismatch {
1390 expected: self.schema.columns.len(),
1391 actual: new_values.len(),
1392 });
1393 }
1394 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1398 if val.is_null() {
1399 if !col.nullable {
1400 return Err(StorageError::NullInNotNull {
1401 column: col.name.clone(),
1402 });
1403 }
1404 continue;
1405 }
1406 let actual = val.data_type().expect("non-null");
1407 let compatible = actual == col.ty
1408 || matches!(
1409 (actual, col.ty),
1410 (
1411 DataType::Text,
1412 DataType::Varchar(_) | DataType::Char(_) | DataType::Json
1413 ) | (DataType::Json, DataType::Text)
1414 )
1415 || matches!(
1416 (actual, col.ty),
1417 (
1418 DataType::Numeric { scale: a, .. },
1419 DataType::Numeric { scale: b, .. },
1420 ) if a == b
1421 );
1422 if !compatible {
1423 return Err(StorageError::TypeMismatch {
1424 column: col.name.clone(),
1425 expected: col.ty,
1426 actual,
1427 position: i,
1428 });
1429 }
1430 }
1431 let old_row = self
1432 .rows
1433 .get(position)
1434 .expect("position bounds-checked above");
1435 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1436 let new_row = Row::new(new_values);
1437 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1438 self.rows = self
1439 .rows
1440 .set(position, new_row)
1441 .expect("position bounds-checked above");
1442 self.hot_bytes = self
1443 .hot_bytes
1444 .saturating_sub(old_bytes)
1445 .saturating_add(new_bytes);
1446 self.rebuild_indices();
1447 Ok(())
1448 }
1449
1450 fn rebuild_indices(&mut self) {
1457 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1466 .indices
1467 .iter()
1468 .filter_map(|idx| match &idx.kind {
1469 IndexKind::BTree(map) => {
1470 let cold: Vec<(IndexKey, RowLocator)> = map
1471 .iter()
1472 .flat_map(|(k, locs)| {
1473 locs.iter()
1474 .filter(|l| l.is_cold())
1475 .copied()
1476 .map(move |l| (k.clone(), l))
1477 })
1478 .collect();
1479 if cold.is_empty() {
1480 None
1481 } else {
1482 Some((idx.name.clone(), cold))
1483 }
1484 }
1485 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1487 })
1488 .collect();
1489
1490 #[derive(Clone)]
1495 enum RebuildKind {
1496 BTree,
1497 Nsw(usize),
1498 Brin(DataType),
1499 }
1500 let descriptors: Vec<(String, usize, RebuildKind)> = self
1501 .indices
1502 .iter()
1503 .map(|idx| {
1504 let kind = match &idx.kind {
1505 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1506 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1507 IndexKind::BTree(_) => RebuildKind::BTree,
1508 };
1509 (idx.name.clone(), idx.column_position, kind)
1510 })
1511 .collect();
1512 self.indices.clear();
1513 for (name, column_position, rebuild_kind) in descriptors {
1514 match rebuild_kind {
1515 RebuildKind::Nsw(m) => {
1516 let idx = Index::new_nsw(name, column_position, m);
1517 self.indices.push(idx);
1518 let idx_pos = self.indices.len() - 1;
1519 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1520 for row_idx in row_indices {
1521 nsw_insert_at(self, idx_pos, row_idx);
1522 }
1523 }
1524 RebuildKind::Brin(column_type) => {
1525 self.indices.push(Index::new_brin(name, column_position, column_type));
1528 }
1529 RebuildKind::BTree => {
1530 let mut idx = Index::new_btree(name, column_position);
1531 if let IndexKind::BTree(map) = &mut idx.kind {
1532 for (i, row) in self.rows.iter().enumerate() {
1533 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1534 let mut entries = map.get(&key).cloned().unwrap_or_default();
1535 entries.push(RowLocator::Hot(i));
1536 map.insert_mut(key, entries);
1537 }
1538 }
1539 }
1540 self.indices.push(idx);
1541 }
1542 }
1543 }
1544
1545 for (idx_name, locators) in preserved_cold {
1550 let _ = self.register_cold_locators(&idx_name, locators);
1554 }
1555 }
1556
1557 fn add_nsw_index_inner(
1558 &mut self,
1559 name: String,
1560 column_name: &str,
1561 m: usize,
1562 restore: Option<NswGraph>,
1563 ) -> Result<(), StorageError> {
1564 if self.indices.iter().any(|i| i.name == name) {
1565 return Err(StorageError::DuplicateIndex { name });
1566 }
1567 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1568 StorageError::ColumnNotFound {
1569 column: column_name.into(),
1570 }
1571 })?;
1572 if !matches!(
1573 self.schema.columns[column_position].ty,
1574 DataType::Vector { .. }
1575 ) {
1576 return Err(StorageError::TypeMismatch {
1577 column: column_name.into(),
1578 expected: DataType::Vector {
1579 dim: 0,
1580 encoding: VecEncoding::F32,
1581 },
1582 actual: self.schema.columns[column_position].ty,
1583 position: column_position,
1584 });
1585 }
1586 if let Some(graph) = restore {
1587 self.indices.push(Index {
1588 name,
1589 column_position,
1590 kind: IndexKind::Nsw(graph),
1591 included_columns: Vec::new(),
1592 partial_predicate: None,
1593 expression: None,
1594 });
1595 return Ok(());
1596 }
1597 let idx = Index::new_nsw(name, column_position, m);
1598 self.indices.push(idx);
1599 let idx_pos = self.indices.len() - 1;
1600 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1603 for row_idx in row_indices {
1604 nsw_insert_at(self, idx_pos, row_idx);
1605 }
1606 Ok(())
1607 }
1608}
1609
1610fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1617 if matches!(cell, Value::Null) {
1618 return Ok(cell);
1619 }
1620 let as_f32: Vec<f32> = match &cell {
1622 Value::Vector(v) => v.clone(),
1623 Value::Sq8Vector(q) => quantize::dequantize(q),
1624 Value::HalfVector(h) => h.to_f32_vec(),
1625 other => {
1626 return Err(StorageError::Unsupported(format!(
1627 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1628 other.data_type()
1629 )));
1630 }
1631 };
1632 Ok(match target {
1637 VecEncoding::F32 => Value::Vector(as_f32),
1638 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1639 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1640 })
1641}
1642
1643fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1650 let col_pos = table.indices[idx_pos].column_position;
1651 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1652 Value::Vector(v) => Some(v.len()),
1653 Value::Sq8Vector(q) => Some(q.bytes.len()),
1654 Value::HalfVector(h) => Some(h.dim()),
1655 _ => None,
1656 };
1657 let Some(dim) = cell_dim else {
1658 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1661 return;
1662 };
1663 if dim == 0 {
1664 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1665 return;
1666 }
1667 let level = nsw_assign_level(new_row_idx);
1668 ensure_node_slot(table, idx_pos, new_row_idx, level);
1669 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1670 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1671 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1672 unreachable!("nsw_insert_at on a non-NSW index")
1673 }
1674 };
1675 if entry.is_none() {
1677 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1678 g.entry = Some(new_row_idx);
1679 g.entry_level = level;
1680 *g.levels
1681 .get_mut(new_row_idx)
1682 .expect("levels slot padded by ensure_node_slot") = level;
1683 }
1684 return;
1685 }
1686 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1688 *g.levels
1689 .get_mut(new_row_idx)
1690 .expect("levels slot padded by ensure_node_slot") = level;
1691 }
1692 let query = match &table.rows[new_row_idx].values[col_pos] {
1693 Value::Vector(v) => v.clone(),
1694 Value::Sq8Vector(q) => quantize::dequantize(q),
1700 Value::HalfVector(h) => h.to_f32_vec(),
1703 _ => return,
1704 };
1705 let mut current = entry.expect("entry was Some above");
1708 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1709 if entry_level > level {
1710 for layer in (level + 1..=entry_level).rev() {
1711 (current, current_d) =
1712 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1713 }
1714 }
1715 let top = level.min(entry_level);
1719 let ef = (m * 2).max(8);
1720 for layer in (0..=top).rev() {
1721 let cap = if layer == 0 { m * 2 } else { m };
1722 let mut candidates = layer_beam_search(
1723 table,
1724 idx_pos,
1725 layer,
1726 current,
1727 current_d,
1728 &query,
1729 ef,
1730 NswMetric::L2,
1731 );
1732 candidates.retain(|&(_, n)| n != new_row_idx);
1733 if let Some(&(d, n)) = candidates.first() {
1736 current = n;
1737 current_d = d;
1738 }
1739 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1740 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1741 }
1742 if level > entry_level
1745 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1746 {
1747 g.entry = Some(new_row_idx);
1748 g.entry_level = level;
1749 }
1750}
1751
1752fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1756 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1757 unreachable!("ensure_node_slot on a BTree index");
1758 };
1759 while g.layers.len() <= level as usize {
1760 g.layers.push(PersistentVec::new());
1761 }
1762 while g.levels.len() <= new_row_idx {
1763 g.levels.push_mut(0);
1764 }
1765 for layer_vec in &mut g.layers {
1766 while layer_vec.len() <= new_row_idx {
1767 layer_vec.push_mut(Vec::new());
1768 }
1769 }
1770}
1771
1772fn greedy_layer_walk(
1778 table: &Table,
1779 idx_pos: usize,
1780 layer: u8,
1781 mut current: usize,
1782 mut current_d: f32,
1783 query: &[f32],
1784) -> (usize, f32) {
1785 let g = match &table.indices[idx_pos].kind {
1786 IndexKind::Nsw(g) => g,
1787 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1788 };
1789 let col_pos = table.indices[idx_pos].column_position;
1790 loop {
1791 let neighbours: &[u32] = g
1792 .layers
1793 .get(layer as usize)
1794 .and_then(|layer_v| layer_v.get(current))
1795 .map_or(&[][..], Vec::as_slice);
1796 let mut best = current;
1797 let mut best_d = current_d;
1798 for &n in neighbours {
1799 let n = n as usize;
1800 let d = vec_l2_sq(table, col_pos, n, query);
1801 if d < best_d {
1802 best = n;
1803 best_d = d;
1804 }
1805 }
1806 if best == current {
1807 return (current, current_d);
1808 }
1809 current = best;
1810 current_d = best_d;
1811 }
1812}
1813
1814#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1827 table: &Table,
1828 idx_pos: usize,
1829 layer: u8,
1830 entry_node: usize,
1831 entry_d: f32,
1832 query: &[f32],
1833 ef: usize,
1834 metric: NswMetric,
1835) -> Vec<(f32, usize)> {
1836 let g = match &table.indices[idx_pos].kind {
1837 IndexKind::Nsw(g) => g,
1838 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1839 };
1840 let col_pos = table.indices[idx_pos].column_position;
1841 let d0 = if matches!(metric, NswMetric::L2) {
1842 entry_d
1843 } else {
1844 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1845 };
1846 let row_count = table.rows.len();
1847 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1848 if entry_node < row_count {
1849 visited[entry_node] = true;
1850 }
1851 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1854 alloc::collections::BinaryHeap::with_capacity(ef);
1855 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1856 alloc::collections::BinaryHeap::with_capacity(ef);
1857 candidates.push(NodeClosest {
1858 dist: d0,
1859 node: entry_node,
1860 });
1861 results.push(NodeFurthest {
1862 dist: d0,
1863 node: entry_node,
1864 });
1865 while let Some(cur) = candidates.pop() {
1866 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1867 if cur.dist > worst && results.len() >= ef {
1868 break;
1869 }
1870 let neighbours: &[u32] = g
1871 .layers
1872 .get(layer as usize)
1873 .and_then(|layer_v| layer_v.get(cur.node))
1874 .map_or(&[][..], Vec::as_slice);
1875 for &n in neighbours {
1876 let n = n as usize;
1877 if n >= row_count || visited[n] {
1878 continue;
1879 }
1880 visited[n] = true;
1881 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
1885 if !dn.is_finite() {
1886 continue;
1887 }
1888 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1889 if results.len() < ef || dn < worst {
1890 results.push(NodeFurthest { dist: dn, node: n });
1891 if results.len() > ef {
1892 results.pop();
1893 }
1894 candidates.push(NodeClosest { dist: dn, node: n });
1895 }
1896 }
1897 }
1898 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
1901 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
1902 out
1903}
1904
1905#[derive(Debug, Clone, Copy)]
1909struct NodeClosest {
1910 dist: f32,
1911 node: usize,
1912}
1913impl PartialEq for NodeClosest {
1914 fn eq(&self, other: &Self) -> bool {
1915 self.dist == other.dist && self.node == other.node
1916 }
1917}
1918impl Eq for NodeClosest {}
1919impl PartialOrd for NodeClosest {
1920 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
1921 Some(self.cmp(other))
1922 }
1923}
1924impl Ord for NodeClosest {
1925 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
1926 other
1928 .dist
1929 .partial_cmp(&self.dist)
1930 .unwrap_or(core::cmp::Ordering::Equal)
1931 }
1932}
1933
1934#[derive(Debug, Clone, Copy)]
1937struct NodeFurthest {
1938 dist: f32,
1939 node: usize,
1940}
1941impl PartialEq for NodeFurthest {
1942 fn eq(&self, other: &Self) -> bool {
1943 self.dist == other.dist && self.node == other.node
1944 }
1945}
1946impl Eq for NodeFurthest {}
1947impl PartialOrd for NodeFurthest {
1948 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
1949 Some(self.cmp(other))
1950 }
1951}
1952impl Ord for NodeFurthest {
1953 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
1954 self.dist
1955 .partial_cmp(&other.dist)
1956 .unwrap_or(core::cmp::Ordering::Equal)
1957 }
1958}
1959
1960fn select_neighbours_heuristic(
1969 candidates: &[(f32, usize)],
1970 m: usize,
1971 table: &Table,
1972 col_pos: usize,
1973) -> Vec<usize> {
1974 let mut chosen: Vec<usize> = Vec::with_capacity(m);
1975 for &(d_q, e) in candidates {
1976 if chosen.len() >= m {
1977 break;
1978 }
1979 if !matches!(
1984 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
1985 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
1986 ) {
1987 continue;
1988 }
1989 let mut covered = false;
1990 for &r in &chosen {
1991 if cell_l2_sq(table, col_pos, e, r) < d_q {
1995 covered = true;
1996 break;
1997 }
1998 }
1999 if !covered {
2000 chosen.push(e);
2001 }
2002 }
2003 chosen
2004}
2005
2006fn connect_at_layer(
2010 table: &mut Table,
2011 idx_pos: usize,
2012 layer: u8,
2013 new_row_idx: usize,
2014 peers: &[usize],
2015) {
2016 let col_pos = table.indices[idx_pos].column_position;
2017 let cap = match &table.indices[idx_pos].kind {
2018 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2019 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2020 };
2021 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2026 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2027 let layer_v = &mut g.layers[layer as usize];
2028 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2029 *slot = peers
2030 .iter()
2031 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2032 .collect();
2033 }
2034 }
2035 for &peer in peers {
2036 if !matches!(
2040 &table.rows[peer].values[col_pos],
2041 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2042 ) {
2043 continue;
2044 }
2045 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2047 let layer_v = &mut g.layers[layer as usize];
2048 if let Some(slot) = layer_v.get_mut(peer)
2049 && !slot.contains(&new_row_u32)
2050 {
2051 slot.push(new_row_u32);
2052 }
2053 }
2054 let needs_trim = match &table.indices[idx_pos].kind {
2058 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2059 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2060 };
2061 if needs_trim {
2062 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2063 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2064 .iter()
2065 .map(|&n| n as usize)
2066 .collect(),
2067 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2068 };
2069 let mut tagged: Vec<(f32, usize)> = current_peers
2074 .iter()
2075 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2076 .collect();
2077 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2078 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2079 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2080 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2081 {
2082 *slot = kept
2083 .into_iter()
2084 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2085 .collect();
2086 }
2087 }
2088 }
2089}
2090
2091fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2098 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2099 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2100 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2101 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2102 }
2103 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2107 halfvec::half_l2_distance_sq_asymmetric(h, query)
2108 }
2109 _ => f32::INFINITY,
2110 }
2111}
2112
2113fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2120 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2121 return f32::INFINITY;
2122 };
2123 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2124 return f32::INFINITY;
2125 };
2126 match (cell_a, cell_b) {
2127 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2128 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2129 quantize::sq8_l2_distance_sq(a, b)
2130 }
2131 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2136 halfvec::half_l2_distance_sq(a, b)
2137 }
2138 _ => f32::INFINITY,
2139 }
2140}
2141
2142fn cell_to_query_metric_distance(
2147 table: &Table,
2148 col_pos: usize,
2149 row: usize,
2150 query: &[f32],
2151 metric: NswMetric,
2152) -> f32 {
2153 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2154 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2155 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2156 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2157 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2158 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2159 },
2160 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2163 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2164 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2165 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2166 },
2167 _ => f32::INFINITY,
2168 }
2169}
2170
2171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2177pub enum NswMetric {
2178 L2,
2181 InnerProduct,
2184 Cosine,
2187}
2188
2189fn nsw_search(
2195 table: &Table,
2196 idx_pos: usize,
2197 query: &[f32],
2198 k: usize,
2199 ef: usize,
2200 metric: NswMetric,
2201) -> Vec<(f32, usize)> {
2202 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2203 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2204 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2205 };
2206 let Some(entry) = entry else {
2207 return Vec::new();
2208 };
2209 let col_pos = table.indices[idx_pos].column_position;
2210 let sq8 = matches!(
2217 table.schema.columns.get(col_pos).map(|c| c.ty),
2218 Some(DataType::Vector {
2219 encoding: VecEncoding::Sq8,
2220 ..
2221 })
2222 );
2223 let ef = if sq8 {
2224 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2225 } else {
2226 ef.max(k)
2227 };
2228 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2230 let mut current = entry;
2231 let mut current_d = entry_d;
2232 for layer in (1..=entry_level).rev() {
2233 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2234 }
2235 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2237 if sq8 {
2238 results = sq8_rerank(table, col_pos, &results, query, metric);
2239 }
2240 results.truncate(k);
2241 results
2242}
2243
2244fn sq8_rerank(
2251 table: &Table,
2252 col_pos: usize,
2253 candidates: &[(f32, usize)],
2254 query: &[f32],
2255 metric: NswMetric,
2256) -> Vec<(f32, usize)> {
2257 let mut out: Vec<(f32, usize)> = candidates
2258 .iter()
2259 .filter_map(|&(adc_d, row)| {
2260 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2261 let Value::Sq8Vector(q) = cell else {
2262 return Some((adc_d, row));
2266 };
2267 let deq = quantize::dequantize(q);
2268 if deq.len() != query.len() {
2269 return None;
2270 }
2271 Some((metric_distance(metric, &deq, query), row))
2272 })
2273 .collect();
2274 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2275 out
2276}
2277
2278const SQ8_RERANK_OVER_FETCH: usize = 3;
2282
2283fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2284 match metric {
2285 NswMetric::L2 => l2_distance_sq(a, b),
2286 NswMetric::InnerProduct => -inner_product_f32(a, b),
2287 NswMetric::Cosine => {
2288 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2289 if na == 0.0 || nb == 0.0 {
2290 return f32::INFINITY;
2291 }
2292 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2295 1.0 - dot / denom
2296 }
2297 }
2298}
2299
2300#[doc(hidden)]
2309#[inline]
2310pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2311 #[cfg(target_arch = "aarch64")]
2312 {
2313 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2314 return unsafe { inner_product_neon(a, b) };
2317 }
2318 }
2319 inner_product_scalar(a, b)
2320}
2321
2322fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2323 let mut dot: f32 = 0.0;
2324 for (x, y) in a.iter().zip(b.iter()) {
2325 dot += x * y;
2326 }
2327 dot
2328}
2329
2330#[cfg(target_arch = "aarch64")]
2331#[target_feature(enable = "neon")]
2332#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2334 use core::arch::aarch64::{
2335 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2336 };
2337 unsafe {
2338 let zero: float32x4_t = vdupq_n_f32(0.0);
2341 let mut acc0 = zero;
2342 let mut acc1 = zero;
2343 let n = a.len();
2344 let mut i = 0usize;
2345 while i + 8 <= n {
2346 let av0 = vld1q_f32(a.as_ptr().add(i));
2347 let bv0 = vld1q_f32(b.as_ptr().add(i));
2348 acc0 = vfmaq_f32(acc0, av0, bv0);
2349 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2350 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2351 acc1 = vfmaq_f32(acc1, av1, bv1);
2352 i += 8;
2353 }
2354 while i + 4 <= n {
2355 let av = vld1q_f32(a.as_ptr().add(i));
2356 let bv = vld1q_f32(b.as_ptr().add(i));
2357 acc0 = vfmaq_f32(acc0, av, bv);
2358 i += 4;
2359 }
2360 vaddvq_f32(vaddq_f32(acc0, acc1))
2361 }
2362}
2363
2364#[doc(hidden)]
2371#[inline]
2372pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2373 #[cfg(target_arch = "aarch64")]
2374 {
2375 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2376 return unsafe { cosine_dot_norms_neon(a, b) };
2378 }
2379 }
2380 cosine_dot_norms_scalar(a, b)
2381}
2382
2383fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2384 let mut dot: f32 = 0.0;
2385 let mut na: f32 = 0.0;
2386 let mut nb: f32 = 0.0;
2387 for (x, y) in a.iter().zip(b.iter()) {
2388 dot += x * y;
2389 na += x * x;
2390 nb += y * y;
2391 }
2392 (dot, na, nb)
2393}
2394
2395#[cfg(target_arch = "aarch64")]
2396#[target_feature(enable = "neon")]
2397#[allow(clippy::many_single_char_names, clippy::similar_names)]
2398unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2399 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2400 unsafe {
2401 let zero: float32x4_t = vdupq_n_f32(0.0);
2402 let mut acc_dot = zero;
2403 let mut acc_na = zero;
2404 let mut acc_nb = zero;
2405 let n = a.len();
2406 let mut i = 0usize;
2407 while i + 4 <= n {
2408 let av = vld1q_f32(a.as_ptr().add(i));
2409 let bv = vld1q_f32(b.as_ptr().add(i));
2410 acc_dot = vfmaq_f32(acc_dot, av, bv);
2411 acc_na = vfmaq_f32(acc_na, av, av);
2412 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2413 i += 4;
2414 }
2415 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2416 }
2417}
2418
2419fn sqrt_newton_f32(x: f32) -> f32 {
2420 if x <= 0.0 {
2421 return 0.0;
2422 }
2423 let mut g = x;
2424 for _ in 0..10 {
2425 g = 0.5 * (g + x / g);
2426 }
2427 g
2428}
2429
2430#[inline]
2438fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2439 #[cfg(target_arch = "aarch64")]
2440 {
2441 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2442 return unsafe { l2_distance_sq_neon(a, b) };
2446 }
2447 }
2448 l2_distance_sq_scalar(a, b)
2449}
2450
2451fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2452 let mut sum: f32 = 0.0;
2453 for (x, y) in a.iter().zip(b.iter()) {
2454 let d = *x - *y;
2455 sum += d * d;
2456 }
2457 sum
2458}
2459
2460#[cfg(target_arch = "aarch64")]
2461#[target_feature(enable = "neon")]
2462#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2464 use core::arch::aarch64::{
2465 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2466 };
2467 unsafe {
2468 let zero: float32x4_t = vdupq_n_f32(0.0);
2473 let mut acc0 = zero;
2474 let mut acc1 = zero;
2475 let n = a.len();
2476 let mut i = 0usize;
2477 while i + 8 <= n {
2480 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2481 acc0 = vfmaq_f32(acc0, d0, d0);
2482 let d1 = vsubq_f32(
2483 vld1q_f32(a.as_ptr().add(i + 4)),
2484 vld1q_f32(b.as_ptr().add(i + 4)),
2485 );
2486 acc1 = vfmaq_f32(acc1, d1, d1);
2487 i += 8;
2488 }
2489 while i + 4 <= n {
2490 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2491 acc0 = vfmaq_f32(acc0, d, d);
2492 i += 4;
2493 }
2494 vaddvq_f32(vaddq_f32(acc0, acc1))
2495 }
2496}
2497
2498pub fn nsw_query(
2501 table: &Table,
2502 idx_name: &str,
2503 query: &[f32],
2504 k: usize,
2505 metric: NswMetric,
2506) -> Vec<usize> {
2507 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2508 return Vec::new();
2509 };
2510 let ef = (k * 2).max(NSW_DEFAULT_M);
2511 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2512 hits.truncate(k);
2513 hits.into_iter().map(|(_, idx)| idx).collect()
2514}
2515
2516pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2520 table
2521 .indices
2522 .iter()
2523 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2524}
2525
2526#[derive(Debug, Clone, Default)]
2538pub struct Catalog {
2539 tables: Vec<Table>,
2540 by_name: BTreeMap<String, usize>,
2543 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2565}
2566
2567impl Catalog {
2568 pub const fn new() -> Self {
2569 Self {
2570 tables: Vec::new(),
2571 by_name: BTreeMap::new(),
2572 cold_segments: Vec::new(),
2573 }
2574 }
2575
2576 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2577 if self.by_name.contains_key(&schema.name) {
2578 return Err(StorageError::DuplicateTable {
2579 name: schema.name.clone(),
2580 });
2581 }
2582 let idx = self.tables.len();
2583 let name = schema.name.clone();
2584 self.tables.push(Table::new(schema));
2585 self.by_name.insert(name, idx);
2586 Ok(())
2587 }
2588
2589 pub fn get(&self, name: &str) -> Option<&Table> {
2590 let idx = *self.by_name.get(name)?;
2591 self.tables.get(idx)
2592 }
2593
2594 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2595 let idx = *self.by_name.get(name)?;
2596 self.tables.get_mut(idx)
2597 }
2598
2599 pub fn table_count(&self) -> usize {
2600 self.tables.len()
2601 }
2602
2603 pub fn table_names(&self) -> Vec<String> {
2606 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2607 }
2608
2609 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2620 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2621 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2622 })?;
2623 let seg = OwnedSegment::from_bytes(bytes)
2624 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2625 self.cold_segments.push(Some(Arc::new(seg)));
2626 Ok(id)
2627 }
2628
2629 pub fn load_segment_bytes_at(
2642 &mut self,
2643 target_id: u32,
2644 bytes: Vec<u8>,
2645 ) -> Result<(), StorageError> {
2646 let seg = OwnedSegment::from_bytes(bytes)
2647 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2648 let idx = target_id as usize;
2649 while self.cold_segments.len() <= idx {
2650 self.cold_segments.push(None);
2651 }
2652 if self.cold_segments[idx].is_some() {
2653 return Err(StorageError::Corrupt(format!(
2654 "load_segment_bytes_at: segment_id {target_id} already occupied"
2655 )));
2656 }
2657 self.cold_segments[idx] = Some(Arc::new(seg));
2658 Ok(())
2659 }
2660
2661 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2671 let idx = segment_id as usize;
2672 if idx >= self.cold_segments.len() {
2673 return Err(StorageError::Corrupt(format!(
2674 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2675 self.cold_segments.len()
2676 )));
2677 }
2678 self.cold_segments[idx] = None;
2679 Ok(())
2680 }
2681
2682 #[must_use]
2684 pub fn cold_segment_count(&self) -> usize {
2685 self.cold_segments.iter().filter(|s| s.is_some()).count()
2686 }
2687
2688 #[must_use]
2691 pub fn cold_segment_slot_count(&self) -> usize {
2692 self.cold_segments.len()
2693 }
2694
2695 #[must_use]
2700 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2701 self.cold_segments
2702 .iter()
2703 .enumerate()
2704 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2705 .collect()
2706 }
2707
2708 #[must_use]
2715 pub fn hot_tier_bytes(&self) -> u64 {
2716 self.tables
2717 .iter()
2718 .map(Table::hot_bytes)
2719 .fold(0u64, u64::saturating_add)
2720 }
2721
2722 pub fn freeze_oldest_to_cold(
2767 &mut self,
2768 table_name: &str,
2769 index_name: &str,
2770 max_rows: usize,
2771 ) -> Result<FreezeReport, StorageError> {
2772 if max_rows == 0 {
2774 return Err(StorageError::Corrupt(
2775 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2776 ));
2777 }
2778 let table = self.get(table_name).ok_or_else(|| {
2779 StorageError::Corrupt(format!(
2780 "freeze_oldest_to_cold: table {table_name:?} not found"
2781 ))
2782 })?;
2783 if max_rows > table.rows.len() {
2784 return Err(StorageError::Corrupt(format!(
2785 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2786 table.rows.len()
2787 )));
2788 }
2789 let idx = table
2790 .indices
2791 .iter()
2792 .find(|i| i.name == index_name)
2793 .ok_or_else(|| {
2794 StorageError::Corrupt(format!(
2795 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2796 ))
2797 })?;
2798 if !matches!(idx.kind, IndexKind::BTree(_)) {
2799 return Err(StorageError::Corrupt(format!(
2800 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2801 )));
2802 }
2803 let column_position = idx.column_position;
2804
2805 let schema = table.schema.clone();
2807 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2808 for row_idx in 0..max_rows {
2809 let row = table.rows.get(row_idx).expect("bounds-checked above");
2810 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2811 StorageError::Corrupt(format!(
2812 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2813 ))
2814 })?;
2815 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2816 StorageError::Corrupt(format!(
2817 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2818 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2819 ))
2820 })?;
2821 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2822 }
2823 to_freeze.sort_by_key(|(k, _, _)| *k);
2828 for w in to_freeze.windows(2) {
2832 if w[0].0 == w[1].0 {
2833 return Err(StorageError::Corrupt(format!(
2834 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2835 w[0].0
2836 )));
2837 }
2838 }
2839 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2843 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2847 .into_iter()
2848 .map(|(k, body, _)| (k, body))
2849 .collect();
2850 let frozen_rows = seg_rows.len();
2851 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2852 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2853
2854 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2863 let positions: Vec<usize> = (0..max_rows).collect();
2864 let t_mut = self
2865 .get_mut(table_name)
2866 .expect("just validated; still present");
2867 let removed = t_mut.delete_rows(&positions);
2868 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2869 let bytes_after = t_mut.hot_bytes();
2870 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2871
2872 let segment_id = self
2873 .load_segment_bytes(seg_bytes.clone())
2874 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2875 let new_cold = post_swap_keys.into_iter().map(|k| {
2876 (
2877 k,
2878 RowLocator::Cold {
2879 segment_id,
2880 page_offset: 0,
2881 },
2882 )
2883 });
2884 let t_mut = self.get_mut(table_name).expect("still present");
2885 t_mut.register_cold_locators(index_name, new_cold)?;
2886
2887 Ok(FreezeReport {
2888 segment_id,
2889 frozen_rows,
2890 bytes_freed,
2891 segment_bytes: seg_bytes,
2892 })
2893 }
2894
2895 #[must_use]
2901 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
2902 self.cold_segments
2903 .get(segment_id as usize)
2904 .and_then(|s| s.as_deref())
2905 }
2906
2907 pub fn resolve_cold_locator(
2916 &self,
2917 table_name: &str,
2918 segment_id: u32,
2919 key: &IndexKey,
2920 ) -> Option<Row> {
2921 let t = self.get(table_name)?;
2922 let u64_key = index_key_as_u64(key)?;
2923 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
2924 let payload = seg.lookup(u64_key)?;
2925 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
2926 Some(row)
2927 }
2928
2929 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
2947 let t = self.get(table)?;
2948 let idx = t.indices.iter().find(|i| i.name == index_name)?;
2949 let locators = idx.lookup_eq(key);
2950 let cold_u64_key = index_key_as_u64(key);
2951 for loc in locators {
2952 match *loc {
2953 RowLocator::Hot(i) => {
2954 if let Some(row) = t.rows.get(i) {
2955 return Some(row.clone());
2956 }
2957 }
2958 RowLocator::Cold {
2959 segment_id,
2960 page_offset: _,
2961 } => {
2962 let Some(u64_key) = cold_u64_key else {
2963 continue;
2966 };
2967 let Some(seg) = self
2968 .cold_segments
2969 .get(segment_id as usize)
2970 .and_then(|s| s.as_deref())
2971 else {
2972 continue;
2983 };
2984 let Some(payload) = seg.lookup(u64_key) else {
2985 continue;
2986 };
2987 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
2988 return Some(row);
2989 }
2990 }
2991 }
2992 None
2993 }
2994
2995 pub fn promote_cold_row(
3017 &mut self,
3018 table_name: &str,
3019 index_name: &str,
3020 key: &IndexKey,
3021 ) -> Result<Option<usize>, StorageError> {
3022 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3023 let Some((segment_id, _page_offset)) = cold_loc else {
3024 return Ok(None);
3025 };
3026 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3027 StorageError::Corrupt(
3028 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3029 .into(),
3030 )
3031 })?;
3032 let schema = self
3036 .get(table_name)
3037 .ok_or_else(|| {
3038 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3039 })?
3040 .schema
3041 .clone();
3042 let seg = self
3043 .cold_segments
3044 .get(segment_id as usize)
3045 .and_then(|s| s.as_ref())
3046 .ok_or_else(|| {
3047 StorageError::Corrupt(format!(
3048 "promote_cold_row: segment {segment_id} not registered on catalog"
3049 ))
3050 })?;
3051 let payload = seg.lookup(u64_key).ok_or_else(|| {
3052 StorageError::Corrupt(format!(
3053 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3054 but the segment's bloom/page lookup didn't return a row"
3055 ))
3056 })?;
3057 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3058 let t = self
3063 .get_mut(table_name)
3064 .expect("table existed at lookup time");
3065 t.insert(row)?;
3066 let new_hot_idx =
3067 t.rows.len().checked_sub(1).ok_or_else(|| {
3068 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3069 })?;
3070 t.remove_cold_locators_for_key(index_name, key)?;
3074 Ok(Some(new_hot_idx))
3075 }
3076
3077 pub fn shadow_cold_row(
3095 &mut self,
3096 table_name: &str,
3097 index_name: &str,
3098 key: &IndexKey,
3099 ) -> Result<usize, StorageError> {
3100 let t = self.get_mut(table_name).ok_or_else(|| {
3101 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3102 })?;
3103 t.remove_cold_locators_for_key(index_name, key)
3104 }
3105
3106 pub fn prepare_freeze_slice(
3124 &self,
3125 table_name: &str,
3126 index_name: &str,
3127 row_range: core::ops::Range<usize>,
3128 ) -> Result<FreezeSlice, StorageError> {
3129 let table = self.get(table_name).ok_or_else(|| {
3130 StorageError::Corrupt(format!(
3131 "prepare_freeze_slice: table {table_name:?} not found"
3132 ))
3133 })?;
3134 let idx = table
3135 .indices
3136 .iter()
3137 .find(|i| i.name == index_name)
3138 .ok_or_else(|| {
3139 StorageError::Corrupt(format!(
3140 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3141 ))
3142 })?;
3143 if !matches!(idx.kind, IndexKind::BTree(_)) {
3144 return Err(StorageError::Corrupt(format!(
3145 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3146 )));
3147 }
3148 if row_range.end > table.rows.len() {
3149 return Err(StorageError::Corrupt(format!(
3150 "prepare_freeze_slice: row_range end {} > row_count {}",
3151 row_range.end,
3152 table.rows.len()
3153 )));
3154 }
3155 let column_position = idx.column_position;
3156 let schema = table.schema.clone();
3157 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3158 for row_idx in row_range.clone() {
3159 let row = table.rows.get(row_idx).expect("bounds-checked above");
3160 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3161 StorageError::Corrupt(format!(
3162 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3163 ))
3164 })?;
3165 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3166 StorageError::Corrupt(format!(
3167 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3168 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3169 ))
3170 })?;
3171 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3172 }
3173 rows.sort_by_key(|(k, _, _)| *k);
3174 Ok(FreezeSlice { row_range, rows })
3175 }
3176
3177 pub fn commit_freeze_slices(
3191 &mut self,
3192 table_name: &str,
3193 index_name: &str,
3194 slices: Vec<FreezeSlice>,
3195 ) -> Result<FreezeReport, StorageError> {
3196 let table = self.get(table_name).ok_or_else(|| {
3198 StorageError::Corrupt(format!(
3199 "commit_freeze_slices: table {table_name:?} not found"
3200 ))
3201 })?;
3202 let idx = table
3203 .indices
3204 .iter()
3205 .find(|i| i.name == index_name)
3206 .ok_or_else(|| {
3207 StorageError::Corrupt(format!(
3208 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3209 ))
3210 })?;
3211 if !matches!(idx.kind, IndexKind::BTree(_)) {
3212 return Err(StorageError::Corrupt(format!(
3213 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3214 )));
3215 }
3216 let mut ordered = slices;
3220 ordered.sort_by_key(|s| s.row_range.start);
3221 let mut expected_start = 0usize;
3225 for s in &ordered {
3226 if s.row_range.start != expected_start {
3227 return Err(StorageError::Corrupt(format!(
3228 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3229 s.row_range.start, expected_start
3230 )));
3231 }
3232 expected_start = s.row_range.end;
3233 }
3234 let max_rows = expected_start;
3235 if max_rows > table.rows.len() {
3236 return Err(StorageError::Corrupt(format!(
3237 "commit_freeze_slices: total row range {} exceeds row_count {}",
3238 max_rows,
3239 table.rows.len()
3240 )));
3241 }
3242 if max_rows == 0 {
3243 return Ok(FreezeReport {
3244 segment_id: u32::MAX,
3245 frozen_rows: 0,
3246 bytes_freed: 0,
3247 segment_bytes: Vec::new(),
3248 });
3249 }
3250
3251 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3256 if total_rows != max_rows {
3257 return Err(StorageError::Corrupt(format!(
3258 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3259 )));
3260 }
3261 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3262 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3263 loop {
3264 let mut pick: Option<usize> = None;
3267 for (i, c) in cursors.iter().enumerate() {
3268 let slice = &ordered[i];
3269 if *c >= slice.rows.len() {
3270 continue;
3271 }
3272 match pick {
3273 None => pick = Some(i),
3274 Some(j) => {
3275 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3276 pick = Some(i);
3277 }
3278 }
3279 }
3280 }
3281 let Some(i) = pick else { break };
3282 let row = ordered[i].rows[cursors[i]].clone();
3283 cursors[i] += 1;
3284 merged.push(row);
3285 }
3286 for w in merged.windows(2) {
3289 if w[0].0 == w[1].0 {
3290 return Err(StorageError::Corrupt(format!(
3291 "commit_freeze_slices: duplicate PK {} across slices",
3292 w[0].0
3293 )));
3294 }
3295 }
3296 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3297 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3298 .into_iter()
3299 .map(|(k, body, _)| (k, body))
3300 .collect();
3301 let frozen_rows = seg_rows.len();
3302 let (seg_bytes, _meta) =
3303 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3304 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3305 })?;
3306
3307 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3309 let positions: Vec<usize> = (0..max_rows).collect();
3310 let t_mut = self
3311 .get_mut(table_name)
3312 .expect("just validated; still present");
3313 let removed = t_mut.delete_rows(&positions);
3314 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3315 let bytes_after = t_mut.hot_bytes();
3316 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3317
3318 let segment_id = self
3319 .load_segment_bytes(seg_bytes.clone())
3320 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3321 let new_cold = post_swap_keys.into_iter().map(|k| {
3322 (
3323 k,
3324 RowLocator::Cold {
3325 segment_id,
3326 page_offset: 0,
3327 },
3328 )
3329 });
3330 let t_mut = self.get_mut(table_name).expect("still present");
3331 t_mut.register_cold_locators(index_name, new_cold)?;
3332
3333 Ok(FreezeReport {
3334 segment_id,
3335 frozen_rows,
3336 bytes_freed,
3337 segment_bytes: seg_bytes,
3338 })
3339 }
3340
3341 pub fn compact_cold_segments(
3384 &mut self,
3385 table_name: &str,
3386 index_name: &str,
3387 target_segment_bytes: u64,
3388 ) -> Result<CompactReport, StorageError> {
3389 let t = self.get(table_name).ok_or_else(|| {
3391 StorageError::Corrupt(format!(
3392 "compact_cold_segments: table {table_name:?} not found"
3393 ))
3394 })?;
3395 let idx = t
3396 .indices
3397 .iter()
3398 .find(|i| i.name == index_name)
3399 .ok_or_else(|| {
3400 StorageError::Corrupt(format!(
3401 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3402 ))
3403 })?;
3404 let map = match &idx.kind {
3405 IndexKind::BTree(m) => m,
3406 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3407 return Err(StorageError::Corrupt(format!(
3408 "compact_cold_segments: index {index_name:?} is not BTree; \
3409 compaction applies only to BTree cold-tier indices"
3410 )));
3411 }
3412 };
3413
3414 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3417 for (_key, locators) in map.iter() {
3418 for loc in locators {
3419 if let RowLocator::Cold { segment_id, .. } = loc {
3420 referenced_ids.insert(*segment_id);
3421 }
3422 }
3423 }
3424 let candidate_set: BTreeSet<u32> = referenced_ids
3426 .into_iter()
3427 .filter(|id| {
3428 self.cold_segments
3429 .get(*id as usize)
3430 .and_then(|s| s.as_deref())
3431 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3432 })
3433 .collect();
3434 if candidate_set.len() < 2 {
3435 return Ok(CompactReport {
3436 sources: Vec::new(),
3437 merged_segment_id: None,
3438 merged_segment_bytes: Vec::new(),
3439 merged_rows: 0,
3440 deleted_rows_pruned: 0,
3441 bytes_reclaimed_estimate: 0,
3442 });
3443 }
3444 let mut source_row_count: usize = 0;
3446 let mut source_byte_total: u64 = 0;
3447 for &id in &candidate_set {
3448 let seg = self.cold_segments[id as usize]
3449 .as_ref()
3450 .expect("candidate selected only when slot is Some");
3451 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3452 source_byte_total =
3453 source_byte_total.saturating_add(seg.bytes().len() as u64);
3454 }
3455 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3461 for (key, locators) in map.iter() {
3462 for loc in locators {
3463 let RowLocator::Cold { segment_id, .. } = loc else {
3464 continue;
3465 };
3466 if !candidate_set.contains(segment_id) {
3467 continue;
3468 }
3469 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3470 StorageError::Corrupt(format!(
3471 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3472 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3473 ))
3474 })?;
3475 let seg = self.cold_segments[*segment_id as usize]
3476 .as_ref()
3477 .expect("candidate slot guaranteed Some above");
3478 let payload = seg.lookup(u64_key).ok_or_else(|| {
3479 StorageError::Corrupt(format!(
3480 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3481 at segment {segment_id} but the segment lookup missed"
3482 ))
3483 })?;
3484 collected.insert(u64_key, (payload, key.clone()));
3485 break;
3486 }
3487 }
3488 let merged_rows = collected.len();
3489 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3490
3491 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3495 .iter()
3496 .map(|(k, (body, _))| (*k, body.clone()))
3497 .collect();
3498 let (seg_bytes, _meta) =
3499 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3500 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3501 })?;
3502 let merged_bytes_len = seg_bytes.len() as u64;
3503
3504 let merged_segment_id = self
3506 .load_segment_bytes(seg_bytes.clone())
3507 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3508
3509 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3515 let t = self
3516 .get(table_name)
3517 .expect("table existed at the start of this fn");
3518 let idx = t
3519 .indices
3520 .iter()
3521 .find(|i| i.name == index_name)
3522 .expect("index existed at the start of this fn");
3523 let IndexKind::BTree(map) = &idx.kind else {
3524 unreachable!("validated above");
3525 };
3526 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3527 };
3528 let t_mut = self
3529 .get_mut(table_name)
3530 .expect("table existed at the start of this fn");
3531 let idx_mut = t_mut
3532 .indices
3533 .iter_mut()
3534 .find(|i| i.name == index_name)
3535 .expect("index existed at the start of this fn");
3536 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3537 unreachable!("validated above");
3538 };
3539 for (key, locators) in entries {
3540 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3541 let mut changed = false;
3542 for loc in &locators {
3543 match *loc {
3544 RowLocator::Cold {
3545 segment_id,
3546 page_offset: _,
3547 } if candidate_set.contains(&segment_id) => {
3548 let replacement = RowLocator::Cold {
3549 segment_id: merged_segment_id,
3550 page_offset: 0,
3551 };
3552 if !new_locs.contains(&replacement) {
3553 new_locs.push(replacement);
3554 }
3555 changed = true;
3556 }
3557 other => new_locs.push(other),
3558 }
3559 }
3560 if changed {
3561 map_mut.insert_mut(key, new_locs);
3562 }
3563 }
3564
3565 for &id in &candidate_set {
3570 self.tombstone_segment(id)?;
3571 }
3572
3573 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3574 Ok(CompactReport {
3575 sources: candidate_set.into_iter().collect(),
3576 merged_segment_id: Some(merged_segment_id),
3577 merged_segment_bytes: seg_bytes,
3578 merged_rows,
3579 deleted_rows_pruned,
3580 bytes_reclaimed_estimate,
3581 })
3582 }
3583
3584 fn find_cold_locator(
3590 &self,
3591 table_name: &str,
3592 index_name: &str,
3593 key: &IndexKey,
3594 ) -> Result<Option<(u32, u32)>, StorageError> {
3595 let t = self.get(table_name).ok_or_else(|| {
3596 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3597 })?;
3598 let idx = t
3599 .indices
3600 .iter()
3601 .find(|i| i.name == index_name)
3602 .ok_or_else(|| {
3603 StorageError::Corrupt(format!(
3604 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3605 ))
3606 })?;
3607 if !matches!(idx.kind, IndexKind::BTree(_)) {
3608 return Err(StorageError::Corrupt(format!(
3609 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3610 )));
3611 }
3612 for loc in idx.lookup_eq(key) {
3613 if let RowLocator::Cold {
3614 segment_id,
3615 page_offset,
3616 } = *loc
3617 {
3618 return Ok(Some((segment_id, page_offset)));
3619 }
3620 }
3621 Ok(None)
3622 }
3623}
3624
3625fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3631 match key {
3632 IndexKey::Int(n) => Some(n.cast_unsigned()),
3638 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3639 }
3640}
3641
3642#[derive(Debug, Clone, PartialEq, Eq)]
3643#[non_exhaustive]
3644pub enum StorageError {
3645 DuplicateTable {
3646 name: String,
3647 },
3648 TableNotFound {
3649 name: String,
3650 },
3651 ArityMismatch {
3652 expected: usize,
3653 actual: usize,
3654 },
3655 TypeMismatch {
3656 column: String,
3657 expected: DataType,
3658 actual: DataType,
3659 position: usize,
3660 },
3661 NullInNotNull {
3662 column: String,
3663 },
3664 DuplicateIndex {
3666 name: String,
3667 },
3668 ColumnNotFound {
3670 column: String,
3671 },
3672 Corrupt(String),
3675 IndexNotFound {
3678 name: String,
3679 },
3680 Unsupported(String),
3684}
3685
3686impl fmt::Display for StorageError {
3687 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3688 match self {
3689 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3690 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3691 Self::ArityMismatch { expected, actual } => write!(
3692 f,
3693 "row arity mismatch: expected {expected} columns, got {actual}"
3694 ),
3695 Self::TypeMismatch {
3696 column,
3697 expected,
3698 actual,
3699 position,
3700 } => write!(
3701 f,
3702 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3703 ),
3704 Self::NullInNotNull { column } => {
3705 write!(f, "NULL value in NOT NULL column {column:?}")
3706 }
3707 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3708 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3709 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3710 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3711 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3712 }
3713 }
3714}
3715
3716impl ColumnSchema {
3717 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3718 Self {
3719 name: name.into(),
3720 ty,
3721 nullable,
3722 default: None,
3723 auto_increment: false,
3724 }
3725 }
3726
3727 #[must_use]
3731 pub fn with_default(mut self, default: Value) -> Self {
3732 self.default = Some(default);
3733 self
3734 }
3735
3736 #[must_use]
3738 pub const fn with_auto_increment(mut self) -> Self {
3739 self.auto_increment = true;
3740 self
3741 }
3742}
3743
3744impl TableSchema {
3745 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3746 Self {
3747 name: name.into(),
3748 columns,
3749 hot_tier_bytes: None,
3750 foreign_keys: Vec::new(),
3751 }
3752 }
3753}
3754
3755const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3803const FILE_VERSION: u8 = 13;
3829const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3832
3833const INDEX_KEY_TAG_INT: u8 = 0;
3838const INDEX_KEY_TAG_TEXT: u8 = 1;
3839const INDEX_KEY_TAG_BOOL: u8 = 2;
3840
3841impl Catalog {
3842 pub fn serialize(&self) -> Vec<u8> {
3845 let mut out = Vec::with_capacity(64);
3846 out.extend_from_slice(FILE_MAGIC);
3847 out.push(FILE_VERSION);
3848 write_u32(
3849 &mut out,
3850 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3851 );
3852 for t in &self.tables {
3853 write_str(&mut out, &t.schema.name);
3854 write_u16(
3855 &mut out,
3856 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3857 );
3858 for c in &t.schema.columns {
3859 write_str(&mut out, &c.name);
3860 write_data_type(&mut out, c.ty);
3861 out.push(u8::from(c.nullable));
3862 match &c.default {
3863 None => out.push(0),
3864 Some(v) => {
3865 out.push(1);
3866 write_value(&mut out, v);
3867 }
3868 }
3869 out.push(u8::from(c.auto_increment));
3870 }
3871 write_u32(
3872 &mut out,
3873 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
3874 );
3875 for row in &t.rows {
3880 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
3881 }
3882 write_u16(
3889 &mut out,
3890 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
3891 );
3892 for idx in &t.indices {
3893 write_str(&mut out, &idx.name);
3894 write_u16(
3895 &mut out,
3896 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
3897 );
3898 match &idx.kind {
3899 IndexKind::BTree(map) => {
3900 out.push(0);
3901 write_u32(
3909 &mut out,
3910 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
3911 );
3912 for (key, locators) in map {
3913 write_index_key(&mut out, key);
3914 write_u32(
3915 &mut out,
3916 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
3917 );
3918 for loc in locators {
3919 loc.write_le(&mut out);
3920 }
3921 }
3922 }
3923 IndexKind::Nsw(g) => {
3924 out.push(1);
3925 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
3926 write_nsw_graph(&mut out, g);
3927 }
3928 IndexKind::Brin { column_type } => {
3929 out.push(2);
3935 write_data_type(&mut out, *column_type);
3936 }
3937 }
3938 write_u16(
3944 &mut out,
3945 u16::try_from(idx.included_columns.len())
3946 .expect("≤ 65k INCLUDE columns/index"),
3947 );
3948 for col_pos in &idx.included_columns {
3949 write_u16(
3950 &mut out,
3951 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
3952 );
3953 }
3954 match &idx.partial_predicate {
3958 None => out.push(0),
3959 Some(pred) => {
3960 out.push(1);
3961 write_str(&mut out, pred);
3962 }
3963 }
3964 match &idx.expression {
3967 None => out.push(0),
3968 Some(expr) => {
3969 out.push(1);
3970 write_str(&mut out, expr);
3971 }
3972 }
3973 }
3974 match t.schema.hot_tier_bytes {
3980 None => out.push(0),
3981 Some(n) => {
3982 out.push(1);
3983 out.extend_from_slice(&n.to_le_bytes());
3984 }
3985 }
3986 write_u16(
3997 &mut out,
3998 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
3999 );
4000 for fk in &t.schema.foreign_keys {
4001 match &fk.name {
4002 None => out.push(0),
4003 Some(n) => {
4004 out.push(1);
4005 write_str(&mut out, n);
4006 }
4007 }
4008 write_u16(
4009 &mut out,
4010 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4011 );
4012 for &p in &fk.local_columns {
4013 write_u16(
4014 &mut out,
4015 u16::try_from(p).expect("≤ 65k columns/table"),
4016 );
4017 }
4018 write_str(&mut out, &fk.parent_table);
4019 write_u16(
4020 &mut out,
4021 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4022 );
4023 for &p in &fk.parent_columns {
4024 write_u16(
4025 &mut out,
4026 u16::try_from(p).expect("≤ 65k columns/table"),
4027 );
4028 }
4029 out.push(fk.on_delete.tag());
4030 out.push(fk.on_update.tag());
4031 }
4032 }
4033 out
4034 }
4035
4036 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4039 let mut cur = Cursor::new(buf);
4040 let magic = cur.take(8)?;
4041 if magic != FILE_MAGIC {
4042 return Err(StorageError::Corrupt(format!(
4043 "bad magic: expected SPGDB001, got {magic:?}"
4044 )));
4045 }
4046 let version = cur.read_u8()?;
4047 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4048 return Err(StorageError::Corrupt(format!(
4049 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4050 )));
4051 }
4052 let table_count = cur.read_u32()? as usize;
4053 let mut cat = Self::new();
4054 for _ in 0..table_count {
4055 deserialize_table(&mut cur, &mut cat, version)?;
4056 }
4057 if cur.pos < buf.len() {
4058 return Err(StorageError::Corrupt(format!(
4059 "trailing bytes: {} unread",
4060 buf.len() - cur.pos
4061 )));
4062 }
4063 Ok(cat)
4064 }
4065}
4066
4067fn deserialize_table(
4072 cur: &mut Cursor<'_>,
4073 cat: &mut Catalog,
4074 version: u8,
4075) -> Result<(), StorageError> {
4076 let table_name = cur.read_str()?;
4077 let name = table_name.clone();
4078 let col_count = cur.read_u16()? as usize;
4079 let mut cols = Vec::with_capacity(col_count);
4080 for _ in 0..col_count {
4081 let c_name = cur.read_str()?;
4082 let ty = cur.read_data_type()?;
4083 let nullable = cur.read_u8()? != 0;
4084 let default = match cur.read_u8()? {
4085 0 => None,
4086 1 => Some(cur.read_value()?),
4087 other => {
4088 return Err(StorageError::Corrupt(format!(
4089 "unknown default tag: {other}"
4090 )));
4091 }
4092 };
4093 let auto_increment = cur.read_u8()? != 0;
4094 cols.push(ColumnSchema {
4095 name: c_name,
4096 ty,
4097 nullable,
4098 default,
4099 auto_increment,
4100 });
4101 }
4102 let n_cols = cols.len();
4103 cat.create_table(TableSchema::new(name, cols))?;
4104 let t = cat.tables.last_mut().expect("create_table just pushed");
4108 deserialize_rows(cur, t, n_cols)?;
4109 deserialize_indices(cur, t, version)?;
4110 if version >= 11 {
4116 let has = cur.read_u8()?;
4117 let hot_tier_bytes = match has {
4118 0 => None,
4119 1 => Some(cur.read_u64()?),
4120 other => {
4121 return Err(StorageError::Corrupt(format!(
4122 "hot_tier_bytes appendix: unknown has-value byte {other}"
4123 )));
4124 }
4125 };
4126 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4127 }
4128 if version >= 13 {
4131 let fk_count = cur.read_u16()? as usize;
4132 let mut fks = Vec::with_capacity(fk_count);
4133 for _ in 0..fk_count {
4134 let name = match cur.read_u8()? {
4135 0 => None,
4136 1 => Some(cur.read_str()?),
4137 other => {
4138 return Err(StorageError::Corrupt(format!(
4139 "FK appendix: unknown has-name byte {other}"
4140 )));
4141 }
4142 };
4143 let local_arity = cur.read_u16()? as usize;
4144 let mut local_columns = Vec::with_capacity(local_arity);
4145 for _ in 0..local_arity {
4146 local_columns.push(cur.read_u16()? as usize);
4147 }
4148 let parent_table = cur.read_str()?;
4149 let parent_arity = cur.read_u16()? as usize;
4150 if parent_arity != local_arity {
4151 return Err(StorageError::Corrupt(format!(
4152 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4153 )));
4154 }
4155 let mut parent_columns = Vec::with_capacity(parent_arity);
4156 for _ in 0..parent_arity {
4157 parent_columns.push(cur.read_u16()? as usize);
4158 }
4159 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4160 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4161 })?;
4162 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4163 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4164 })?;
4165 fks.push(ForeignKeyConstraint {
4166 name,
4167 local_columns,
4168 parent_table,
4169 parent_columns,
4170 on_delete,
4171 on_update,
4172 });
4173 }
4174 t.schema_mut().foreign_keys = fks;
4175 }
4176 let _ = table_name;
4177 Ok(())
4178}
4179
4180fn deserialize_rows(
4181 cur: &mut Cursor<'_>,
4182 t: &mut Table,
4183 _n_cols: usize,
4184) -> Result<(), StorageError> {
4185 let row_count = cur.read_u32()? as usize;
4186 let mut hot_bytes: u64 = 0;
4191 for _ in 0..row_count {
4192 let tail = &cur.buf[cur.pos..];
4193 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4194 cur.pos += consumed;
4195 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4201 t.rows.push_mut(row);
4202 }
4203 t.hot_bytes = hot_bytes;
4204 Ok(())
4205}
4206
4207fn deserialize_indices(
4208 cur: &mut Cursor<'_>,
4209 t: &mut Table,
4210 version: u8,
4211) -> Result<(), StorageError> {
4212 let index_count = cur.read_u16()? as usize;
4213 for _ in 0..index_count {
4214 let idx_name = cur.read_str()?;
4215 let col_pos = cur.read_u16()? as usize;
4216 let column_name = t
4217 .schema
4218 .columns
4219 .get(col_pos)
4220 .ok_or_else(|| {
4221 StorageError::Corrupt(format!(
4222 "index {idx_name:?} points at non-existent column position {col_pos}"
4223 ))
4224 })?
4225 .name
4226 .clone();
4227 let kind_tag = cur.read_u8()?;
4228 match kind_tag {
4229 0 => {
4230 if version >= 9 {
4231 let map = read_btree_map(cur)?;
4236 t.restore_btree_index(idx_name, &column_name, map)?;
4237 } else {
4238 t.add_index(idx_name, &column_name)?;
4243 }
4244 }
4245 1 => {
4246 let m = cur.read_u16()? as usize;
4247 let graph = cur.read_nsw_graph(m)?;
4248 t.restore_nsw_index(idx_name, &column_name, graph)?;
4249 }
4250 2 => {
4251 let column_type = cur.read_data_type()?;
4255 t.restore_brin_index(idx_name, &column_name, column_type)?;
4256 }
4257 other => {
4258 return Err(StorageError::Corrupt(format!(
4259 "unknown index kind tag: {other}"
4260 )));
4261 }
4262 }
4263 if version >= 12 {
4266 let num_included = cur.read_u16()? as usize;
4267 if num_included > 0 {
4268 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4269 for _ in 0..num_included {
4270 let cp = cur.read_u16()? as usize;
4271 if cp >= t.schema.columns.len() {
4272 return Err(StorageError::Corrupt(format!(
4273 "INCLUDE column position {cp} out of range \
4274 ({} schema columns)",
4275 t.schema.columns.len()
4276 )));
4277 }
4278 included.push(cp);
4279 }
4280 if let Some(last) = t.indices.last_mut() {
4281 last.included_columns = included;
4282 }
4283 }
4284 match cur.read_u8()? {
4286 0 => {}
4287 1 => {
4288 let pred = cur.read_str()?;
4289 if let Some(last) = t.indices.last_mut() {
4290 last.partial_predicate = Some(pred);
4291 }
4292 }
4293 other => {
4294 return Err(StorageError::Corrupt(format!(
4295 "partial_predicate tag: unknown byte {other}"
4296 )));
4297 }
4298 }
4299 match cur.read_u8()? {
4301 0 => {}
4302 1 => {
4303 let expr = cur.read_str()?;
4304 if let Some(last) = t.indices.last_mut() {
4305 last.expression = Some(expr);
4306 }
4307 }
4308 other => {
4309 return Err(StorageError::Corrupt(format!(
4310 "expression tag: unknown byte {other}"
4311 )));
4312 }
4313 }
4314 }
4315 }
4316 Ok(())
4317}
4318
4319fn read_btree_map(
4323 cur: &mut Cursor<'_>,
4324) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4325 let entry_count = cur.read_u32()? as usize;
4326 let mut map = PersistentBTreeMap::new();
4327 for _ in 0..entry_count {
4328 let key = cur.read_index_key()?;
4329 let locator_count = cur.read_u32()? as usize;
4330 let mut locators = Vec::with_capacity(locator_count);
4331 for _ in 0..locator_count {
4332 let tail = &cur.buf[cur.pos..];
4333 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4334 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4335 })?;
4336 cur.pos += consumed;
4337 locators.push(loc);
4338 }
4339 map.insert_mut(key, locators);
4340 }
4341 Ok(map)
4342}
4343
4344fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4360 let entry = g.entry.map_or(u32::MAX, |e| {
4361 u32::try_from(e).expect("NSW entry fits in u32")
4362 });
4363 write_u16(
4364 out,
4365 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4366 );
4367 out.extend_from_slice(&entry.to_le_bytes());
4368 out.push(g.entry_level);
4369 let node_count = g.levels.len();
4370 write_u32(
4371 out,
4372 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4373 );
4374 for &lvl in &g.levels {
4375 out.push(lvl);
4376 }
4377 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4378 out.push(layer_count);
4379 for layer in &g.layers {
4380 write_u32(
4381 out,
4382 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4383 );
4384 for neighbors in layer {
4385 write_u16(
4386 out,
4387 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4388 );
4389 for &peer in neighbors {
4393 write_u32(out, peer);
4394 }
4395 }
4396 }
4397}
4398
4399fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4400 match t {
4401 DataType::Int => out.push(1),
4402 DataType::BigInt => out.push(2),
4403 DataType::Float => out.push(3),
4404 DataType::Text => out.push(4),
4405 DataType::Bool => out.push(5),
4406 DataType::Vector { dim, encoding } => match encoding {
4407 VecEncoding::F32 => {
4411 out.push(6);
4412 out.extend_from_slice(&dim.to_le_bytes());
4413 }
4414 VecEncoding::F16 => {
4417 out.push(15);
4418 out.extend_from_slice(&dim.to_le_bytes());
4419 }
4420 VecEncoding::Sq8 => {
4426 out.push(14);
4427 out.extend_from_slice(&dim.to_le_bytes());
4428 }
4429 },
4430 DataType::SmallInt => out.push(7),
4431 DataType::Varchar(max) => {
4432 out.push(8);
4433 out.extend_from_slice(&max.to_le_bytes());
4434 }
4435 DataType::Char(size) => {
4436 out.push(9);
4437 out.extend_from_slice(&size.to_le_bytes());
4438 }
4439 DataType::Numeric { precision, scale } => {
4440 out.push(10);
4441 out.push(precision);
4442 out.push(scale);
4443 }
4444 DataType::Date => out.push(11),
4445 DataType::Timestamp => out.push(12),
4446 DataType::Interval => {
4451 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4452 }
4453 DataType::Json => out.push(13),
4454 }
4455}
4456
4457impl Cursor<'_> {
4458 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4459 let tag = self.read_u8()?;
4460 match tag {
4461 1 => Ok(DataType::Int),
4462 2 => Ok(DataType::BigInt),
4463 3 => Ok(DataType::Float),
4464 4 => Ok(DataType::Text),
4465 5 => Ok(DataType::Bool),
4466 6 => Ok(DataType::Vector {
4467 dim: self.read_u32()?,
4468 encoding: VecEncoding::F32,
4469 }),
4470 7 => Ok(DataType::SmallInt),
4471 8 => Ok(DataType::Varchar(self.read_u32()?)),
4472 9 => Ok(DataType::Char(self.read_u32()?)),
4473 10 => {
4474 let precision = self.read_u8()?;
4475 let scale = self.read_u8()?;
4476 Ok(DataType::Numeric { precision, scale })
4477 }
4478 11 => Ok(DataType::Date),
4479 12 => Ok(DataType::Timestamp),
4480 13 => Ok(DataType::Json),
4481 14 => Ok(DataType::Vector {
4482 dim: self.read_u32()?,
4483 encoding: VecEncoding::Sq8,
4484 }),
4485 15 => Ok(DataType::Vector {
4489 dim: self.read_u32()?,
4490 encoding: VecEncoding::F16,
4491 }),
4492 other => Err(StorageError::Corrupt(format!(
4493 "unknown data type tag: {other}"
4494 ))),
4495 }
4496 }
4497}
4498
4499pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4505 debug_assert_eq!(
4506 row.values.len(),
4507 schema.columns.len(),
4508 "row_body_encoded_len: row arity must match schema"
4509 );
4510 let bitmap_bytes = schema.columns.len().div_ceil(8);
4511 let mut n = bitmap_bytes;
4512 for (col_idx, v) in row.values.iter().enumerate() {
4513 if matches!(v, Value::Null) {
4514 continue;
4515 }
4516 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4517 }
4518 n
4519}
4520
4521fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4527 match v {
4528 Value::SmallInt(_) => 2,
4529 Value::Int(_) | Value::Date(_) => 4,
4531 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4533 Value::Bool(_) => 1,
4534 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4536 Value::Vector(vec) => 4 + 4 * vec.len(),
4538 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4545 Value::HalfVector(h) => 4 + h.bytes.len(),
4548 Value::Numeric { .. } => 16 + 1,
4550 Value::Null => 0,
4552 Value::Interval { .. } => {
4554 unreachable!("Value::Interval has no on-disk encoding")
4555 }
4556 }
4557}
4558
4559pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4570 debug_assert_eq!(
4571 row.values.len(),
4572 schema.columns.len(),
4573 "dense encode: row arity must match schema"
4574 );
4575 let bitmap_bytes = schema.columns.len().div_ceil(8);
4576 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4579 let bitmap_offset = out.len();
4580 out.resize(bitmap_offset + bitmap_bytes, 0);
4581 for (i, v) in row.values.iter().enumerate() {
4582 if matches!(v, Value::Null) {
4583 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4584 }
4585 }
4586 for (col_idx, v) in row.values.iter().enumerate() {
4587 if matches!(v, Value::Null) {
4588 continue;
4589 }
4590 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4591 }
4592 out
4593}
4594
4595pub fn decode_row_body_dense(
4601 bytes: &[u8],
4602 schema: &TableSchema,
4603) -> Result<(Row, usize), StorageError> {
4604 let mut cur = Cursor::new(bytes);
4605 let bitmap_bytes = schema.columns.len().div_ceil(8);
4606 let mut bitmap_buf = [0u8; 32];
4607 if bitmap_bytes > bitmap_buf.len() {
4608 return Err(StorageError::Corrupt(format!(
4609 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4610 )));
4611 }
4612 let slice = cur.take(bitmap_bytes)?;
4613 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4614 let mut values = Vec::with_capacity(schema.columns.len());
4615 for (col_idx, col) in schema.columns.iter().enumerate() {
4616 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4617 values.push(Value::Null);
4618 } else {
4619 values.push(cur.read_value_body(col.ty)?);
4620 }
4621 }
4622 Ok((Row { values }, cur.pos))
4623}
4624
4625fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4634 match (v, ty) {
4635 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4636 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4637 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4638 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4639 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4640 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4641 write_str(out, s);
4642 }
4643 (
4644 Value::Vector(v),
4645 DataType::Vector {
4646 encoding: VecEncoding::F32,
4647 ..
4648 },
4649 ) => {
4650 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4651 out.extend_from_slice(&dim.to_le_bytes());
4652 for x in v {
4653 out.extend_from_slice(&x.to_le_bytes());
4654 }
4655 }
4656 (
4662 Value::Sq8Vector(q),
4663 DataType::Vector {
4664 encoding: VecEncoding::Sq8,
4665 ..
4666 },
4667 ) => {
4668 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4669 out.extend_from_slice(&dim.to_le_bytes());
4670 out.extend_from_slice(&q.min.to_le_bytes());
4671 out.extend_from_slice(&q.max.to_le_bytes());
4672 out.extend_from_slice(&q.bytes);
4673 }
4674 (
4678 Value::HalfVector(h),
4679 DataType::Vector {
4680 encoding: VecEncoding::F16,
4681 ..
4682 },
4683 ) => {
4684 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4685 out.extend_from_slice(&dim.to_le_bytes());
4686 out.extend_from_slice(&h.bytes);
4687 }
4688 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4689 out.extend_from_slice(&scaled.to_le_bytes());
4690 out.push(scale);
4691 }
4692 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4693 (Value::Timestamp(t), DataType::Timestamp) => out.extend_from_slice(&t.to_le_bytes()),
4694 (Value::Json(s), DataType::Json) => write_str(out, s),
4698 (other, ty) => unreachable!(
4702 "schema-driven encode received mismatched value/type pair: \
4703 value tag={:?}, column type={:?}",
4704 other.data_type(),
4705 ty
4706 ),
4707 }
4708}
4709
4710fn write_value(out: &mut Vec<u8>, v: &Value) {
4711 match v {
4712 Value::Null => out.push(0),
4713 Value::SmallInt(n) => {
4714 out.push(7);
4715 out.extend_from_slice(&n.to_le_bytes());
4716 }
4717 Value::Int(n) => {
4718 out.push(1);
4719 out.extend_from_slice(&n.to_le_bytes());
4720 }
4721 Value::BigInt(n) => {
4722 out.push(2);
4723 out.extend_from_slice(&n.to_le_bytes());
4724 }
4725 Value::Float(x) => {
4726 out.push(3);
4727 out.extend_from_slice(&x.to_le_bytes());
4728 }
4729 Value::Text(s) | Value::Json(s) => {
4734 out.push(4);
4735 write_str(out, s);
4736 }
4737 Value::Bool(b) => {
4738 out.push(5);
4739 out.push(u8::from(*b));
4740 }
4741 Value::Vector(v) => {
4742 out.push(6);
4743 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4744 out.extend_from_slice(&dim.to_le_bytes());
4745 for x in v {
4746 out.extend_from_slice(&x.to_le_bytes());
4747 }
4748 }
4749 Value::Sq8Vector(q) => {
4754 out.push(11);
4755 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4756 out.extend_from_slice(&dim.to_le_bytes());
4757 out.extend_from_slice(&q.min.to_le_bytes());
4758 out.extend_from_slice(&q.max.to_le_bytes());
4759 out.extend_from_slice(&q.bytes);
4760 }
4761 Value::HalfVector(h) => {
4766 out.push(12);
4767 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4768 out.extend_from_slice(&dim.to_le_bytes());
4769 out.extend_from_slice(&h.bytes);
4770 }
4771 Value::Numeric { scaled, scale } => {
4772 out.push(8);
4773 out.extend_from_slice(&scaled.to_le_bytes());
4774 out.push(*scale);
4775 }
4776 Value::Date(d) => {
4777 out.push(9);
4778 out.extend_from_slice(&d.to_le_bytes());
4779 }
4780 Value::Timestamp(t) => {
4781 out.push(10);
4782 out.extend_from_slice(&t.to_le_bytes());
4783 }
4784 Value::Interval { .. } => {
4788 unreachable!(
4789 "Value::Interval has no on-disk encoding; engine must reject it before write"
4790 )
4791 }
4792 }
4793}
4794
4795fn write_u16(out: &mut Vec<u8>, n: u16) {
4796 out.extend_from_slice(&n.to_le_bytes());
4797}
4798fn write_u32(out: &mut Vec<u8>, n: u32) {
4799 out.extend_from_slice(&n.to_le_bytes());
4800}
4801fn write_str(out: &mut Vec<u8>, s: &str) {
4802 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
4803 write_u16(out, len);
4804 out.extend_from_slice(s.as_bytes());
4805}
4806
4807fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
4811 match key {
4812 IndexKey::Int(n) => {
4813 out.push(INDEX_KEY_TAG_INT);
4814 out.extend_from_slice(&n.to_le_bytes());
4815 }
4816 IndexKey::Text(s) => {
4817 out.push(INDEX_KEY_TAG_TEXT);
4818 write_str(out, s);
4819 }
4820 IndexKey::Bool(b) => {
4821 out.push(INDEX_KEY_TAG_BOOL);
4822 out.push(u8::from(*b));
4823 }
4824 }
4825}
4826
4827struct Cursor<'a> {
4828 buf: &'a [u8],
4829 pos: usize,
4830}
4831
4832impl<'a> Cursor<'a> {
4833 const fn new(buf: &'a [u8]) -> Self {
4834 Self { buf, pos: 0 }
4835 }
4836
4837 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
4838 let end = self
4839 .pos
4840 .checked_add(n)
4841 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
4842 if end > self.buf.len() {
4843 return Err(StorageError::Corrupt(format!(
4844 "unexpected EOF at offset {} (wanted {n} more bytes)",
4845 self.pos
4846 )));
4847 }
4848 let s = &self.buf[self.pos..end];
4849 self.pos = end;
4850 Ok(s)
4851 }
4852
4853 fn read_u8(&mut self) -> Result<u8, StorageError> {
4854 Ok(self.take(1)?[0])
4855 }
4856 fn read_u16(&mut self) -> Result<u16, StorageError> {
4857 let s = self.take(2)?;
4858 Ok(u16::from_le_bytes([s[0], s[1]]))
4859 }
4860 fn read_u32(&mut self) -> Result<u32, StorageError> {
4861 let s = self.take(4)?;
4862 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4863 }
4864 fn read_i32(&mut self) -> Result<i32, StorageError> {
4865 let s = self.take(4)?;
4866 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4867 }
4868 fn read_u64(&mut self) -> Result<u64, StorageError> {
4871 let s = self.take(8)?;
4872 Ok(u64::from_le_bytes([
4873 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
4874 ]))
4875 }
4876 fn read_i64(&mut self) -> Result<i64, StorageError> {
4877 let s = self.take(8)?;
4878 let arr: [u8; 8] = s.try_into().expect("checked");
4879 Ok(i64::from_le_bytes(arr))
4880 }
4881 fn read_f64(&mut self) -> Result<f64, StorageError> {
4882 let s = self.take(8)?;
4883 let arr: [u8; 8] = s.try_into().expect("checked");
4884 Ok(f64::from_le_bytes(arr))
4885 }
4886 fn read_f32(&mut self) -> Result<f32, StorageError> {
4887 let s = self.take(4)?;
4888 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4889 }
4890 fn read_str(&mut self) -> Result<String, StorageError> {
4891 let len = self.read_u16()? as usize;
4892 let bytes = self.take(len)?;
4893 core::str::from_utf8(bytes)
4894 .map(String::from)
4895 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
4896 }
4897
4898 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
4902 let tag = self.read_u8()?;
4903 match tag {
4904 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
4905 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
4906 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
4907 other => Err(StorageError::Corrupt(format!(
4908 "unknown index key tag: {other}"
4909 ))),
4910 }
4911 }
4912 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
4918 match ty {
4919 DataType::SmallInt => {
4920 let s = self.take(2)?;
4921 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
4922 }
4923 DataType::Int => Ok(Value::Int(self.read_i32()?)),
4924 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
4925 DataType::Float => Ok(Value::Float(self.read_f64()?)),
4926 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
4927 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
4928 Ok(Value::Text(self.read_str()?))
4929 }
4930 DataType::Vector {
4931 encoding: VecEncoding::F32,
4932 ..
4933 } => {
4934 let dim = self.read_u32()? as usize;
4935 let mut v = Vec::with_capacity(dim);
4936 for _ in 0..dim {
4937 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
4938 v.push(f32::from_le_bytes(bytes));
4939 }
4940 Ok(Value::Vector(v))
4941 }
4942 DataType::Vector {
4943 encoding: VecEncoding::Sq8,
4944 ..
4945 } => {
4946 let dim = self.read_u32()? as usize;
4947 let min = self.read_f32()?;
4948 let max = self.read_f32()?;
4949 let bytes = self.take(dim)?.to_vec();
4950 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
4951 }
4952 DataType::Vector {
4953 encoding: VecEncoding::F16,
4954 ..
4955 } => {
4956 let dim = self.read_u32()? as usize;
4957 let bytes = self.take(dim * 2)?.to_vec();
4958 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
4959 }
4960 DataType::Numeric { .. } => {
4961 let s = self.take(16)?;
4962 let arr: [u8; 16] = s.try_into().expect("checked");
4963 let scaled = i128::from_le_bytes(arr);
4964 let scale = self.read_u8()?;
4965 Ok(Value::Numeric { scaled, scale })
4966 }
4967 DataType::Date => Ok(Value::Date(self.read_i32()?)),
4968 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
4969 DataType::Interval => {
4970 Err(StorageError::Corrupt(
4975 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
4976 ))
4977 }
4978 DataType::Json => Ok(Value::Json(self.read_str()?)),
4979 }
4980 }
4981
4982 fn read_value(&mut self) -> Result<Value, StorageError> {
4983 let tag = self.read_u8()?;
4984 match tag {
4985 0 => Ok(Value::Null),
4986 1 => Ok(Value::Int(self.read_i32()?)),
4987 2 => Ok(Value::BigInt(self.read_i64()?)),
4988 3 => Ok(Value::Float(self.read_f64()?)),
4989 4 => Ok(Value::Text(self.read_str()?)),
4990 5 => Ok(Value::Bool(self.read_u8()? != 0)),
4991 6 => {
4992 let dim = self.read_u32()? as usize;
4993 let mut v = Vec::with_capacity(dim);
4994 for _ in 0..dim {
4995 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
4996 v.push(f32::from_le_bytes(bytes));
4997 }
4998 Ok(Value::Vector(v))
4999 }
5000 7 => {
5001 let s = self.take(2)?;
5002 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5003 }
5004 8 => {
5005 let s = self.take(16)?;
5006 let arr: [u8; 16] = s.try_into().expect("checked");
5007 let scaled = i128::from_le_bytes(arr);
5008 let scale = self.read_u8()?;
5009 Ok(Value::Numeric { scaled, scale })
5010 }
5011 9 => Ok(Value::Date(self.read_i32()?)),
5012 10 => Ok(Value::Timestamp(self.read_i64()?)),
5013 11 => {
5018 let dim = self.read_u32()? as usize;
5019 let min = self.read_f32()?;
5020 let max = self.read_f32()?;
5021 let bytes = self.take(dim)?.to_vec();
5022 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5023 }
5024 12 => {
5027 let dim = self.read_u32()? as usize;
5028 let bytes = self.take(dim * 2)?.to_vec();
5029 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5030 }
5031 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5032 }
5033 }
5034
5035 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5039 let m_max_0 = self.read_u16()? as usize;
5040 let entry_raw = self.read_u32()?;
5041 let entry = if entry_raw == u32::MAX {
5042 None
5043 } else {
5044 Some(entry_raw as usize)
5045 };
5046 let entry_level = self.read_u8()?;
5047 let node_count = self.read_u32()? as usize;
5048 let mut levels: PersistentVec<u8> = PersistentVec::new();
5053 for _ in 0..node_count {
5054 levels.push_mut(self.read_u8()?);
5055 }
5056 let layer_count = self.read_u8()? as usize;
5057 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5058 for _ in 0..layer_count {
5059 let n = self.read_u32()? as usize;
5060 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5061 for _ in 0..n {
5062 let cnt = self.read_u16()? as usize;
5063 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5064 for _ in 0..cnt {
5065 row.push(self.read_u32()?);
5066 }
5067 per_layer.push_mut(row);
5068 }
5069 layers.push(per_layer);
5070 }
5071 Ok(NswGraph {
5072 m,
5073 m_max_0,
5074 entry,
5075 entry_level,
5076 levels,
5077 layers,
5078 })
5079 }
5080}
5081
5082#[cfg(test)]
5083mod tests {
5084 use super::*;
5085 use alloc::string::ToString;
5086 use alloc::vec;
5087
5088 #[cfg(target_arch = "aarch64")]
5089 #[test]
5090 fn neon_l2_matches_scalar() {
5091 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5096 for &d in &dims {
5097 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5098 let mut a = Vec::with_capacity(d);
5099 let mut b = Vec::with_capacity(d);
5100 for _ in 0..d {
5101 state = state
5102 .wrapping_mul(6_364_136_223_846_793_005)
5103 .wrapping_add(1);
5104 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5105 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5106 state = state
5107 .wrapping_mul(6_364_136_223_846_793_005)
5108 .wrapping_add(1);
5109 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5110 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5111 a.push(x);
5112 b.push(y);
5113 }
5114 let scalar = l2_distance_sq_scalar(&a, &b);
5115 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5116 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5117 assert!(
5118 (scalar - neon).abs() <= tol,
5119 "dim={d}: scalar={scalar} neon={neon} diff={}",
5120 (scalar - neon).abs()
5121 );
5122 }
5123 }
5124
5125 #[cfg(target_arch = "aarch64")]
5126 #[test]
5127 fn neon_inner_product_matches_scalar() {
5128 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5132 for &d in &dims {
5133 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5134 let mut a = Vec::with_capacity(d);
5135 let mut b = Vec::with_capacity(d);
5136 for _ in 0..d {
5137 state = state
5138 .wrapping_mul(6_364_136_223_846_793_005)
5139 .wrapping_add(1);
5140 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5141 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5142 state = state
5143 .wrapping_mul(6_364_136_223_846_793_005)
5144 .wrapping_add(1);
5145 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5146 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5147 a.push(x);
5148 b.push(y);
5149 }
5150 let scalar = inner_product_scalar(&a, &b);
5151 let neon = unsafe { inner_product_neon(&a, &b) };
5152 #[allow(clippy::cast_precision_loss)]
5153 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5154 assert!(
5155 (scalar - neon).abs() <= tol,
5156 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5157 (scalar - neon).abs()
5158 );
5159 }
5160 }
5161
5162 #[cfg(target_arch = "aarch64")]
5163 #[allow(clippy::similar_names)]
5164 #[test]
5165 fn neon_cosine_dot_norms_matches_scalar() {
5166 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5167 for &d in &dims {
5168 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5169 let mut a = Vec::with_capacity(d);
5170 let mut b = Vec::with_capacity(d);
5171 for _ in 0..d {
5172 state = state
5173 .wrapping_mul(6_364_136_223_846_793_005)
5174 .wrapping_add(1);
5175 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5176 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5177 state = state
5178 .wrapping_mul(6_364_136_223_846_793_005)
5179 .wrapping_add(1);
5180 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5181 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5182 a.push(x);
5183 b.push(y);
5184 }
5185 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5186 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5187 #[allow(clippy::cast_precision_loss)]
5188 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5189 #[allow(clippy::cast_precision_loss)]
5190 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5191 assert!(
5192 (dot_s - dot_n).abs() <= tol_d,
5193 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5194 );
5195 assert!(
5196 (na_s - na_n).abs() <= tol_n,
5197 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5198 );
5199 assert!(
5200 (nb_s - nb_n).abs() <= tol_n,
5201 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5202 );
5203 }
5204 }
5205
5206 fn make_users_schema() -> TableSchema {
5207 TableSchema::new(
5208 "users",
5209 vec![
5210 ColumnSchema::new("id", DataType::Int, false),
5211 ColumnSchema::new("name", DataType::Text, false),
5212 ColumnSchema::new("score", DataType::Float, true),
5213 ],
5214 )
5215 }
5216
5217 #[test]
5218 fn value_type_tag_matches_variant() {
5219 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5220 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5221 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5222 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5223 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5224 assert_eq!(Value::Null.data_type(), None);
5225 assert!(Value::Null.is_null());
5226 assert!(!Value::Int(0).is_null());
5227 }
5228
5229 #[test]
5230 fn sq8_value_reports_sq8_data_type() {
5231 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5236 let v = Value::Sq8Vector(q);
5237 assert_eq!(
5238 v.data_type(),
5239 Some(DataType::Vector {
5240 dim: 5,
5241 encoding: VecEncoding::Sq8,
5242 }),
5243 );
5244 }
5245
5246 #[test]
5247 fn datatype_display_matches_pg_keyword() {
5248 assert_eq!(DataType::Int.to_string(), "INT");
5249 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5250 assert_eq!(DataType::Float.to_string(), "FLOAT");
5251 assert_eq!(DataType::Text.to_string(), "TEXT");
5252 assert_eq!(DataType::Bool.to_string(), "BOOL");
5253 }
5254
5255 #[test]
5256 fn row_len_and_emptiness() {
5257 let r = Row::new(vec![Value::Int(1), Value::Null]);
5258 assert_eq!(r.len(), 2);
5259 assert!(!r.is_empty());
5260 assert!(Row::new(Vec::new()).is_empty());
5261 }
5262
5263 #[test]
5264 fn table_schema_column_position() {
5265 let s = make_users_schema();
5266 assert_eq!(s.column_position("id"), Some(0));
5267 assert_eq!(s.column_position("score"), Some(2));
5268 assert_eq!(s.column_position("missing"), None);
5269 }
5270
5271 #[test]
5272 fn catalog_create_table_then_lookup() {
5273 let mut cat = Catalog::new();
5274 cat.create_table(make_users_schema()).unwrap();
5275 assert_eq!(cat.table_count(), 1);
5276 assert!(cat.get("users").is_some());
5277 assert!(cat.get("nope").is_none());
5278 }
5279
5280 #[test]
5281 fn catalog_duplicate_table_is_rejected() {
5282 let mut cat = Catalog::new();
5283 cat.create_table(make_users_schema()).unwrap();
5284 let err = cat.create_table(make_users_schema()).unwrap_err();
5285 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5286 }
5287
5288 #[test]
5289 fn table_insert_happy_path_appends_row() {
5290 let mut cat = Catalog::new();
5291 cat.create_table(make_users_schema()).unwrap();
5292 let t = cat.get_mut("users").unwrap();
5293 t.insert(Row::new(vec![
5294 Value::Int(1),
5295 Value::Text("alice".into()),
5296 Value::Float(99.5),
5297 ]))
5298 .unwrap();
5299 assert_eq!(t.row_count(), 1);
5300 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5301 }
5302
5303 #[test]
5304 fn table_insert_arity_mismatch() {
5305 let mut cat = Catalog::new();
5306 cat.create_table(make_users_schema()).unwrap();
5307 let t = cat.get_mut("users").unwrap();
5308 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5309 assert!(matches!(
5310 err,
5311 StorageError::ArityMismatch {
5312 expected: 3,
5313 actual: 1
5314 }
5315 ));
5316 assert_eq!(t.row_count(), 0);
5317 }
5318
5319 #[test]
5320 fn table_insert_type_mismatch_reports_column() {
5321 let mut cat = Catalog::new();
5322 cat.create_table(make_users_schema()).unwrap();
5323 let t = cat.get_mut("users").unwrap();
5324 let err = t
5325 .insert(Row::new(vec![
5326 Value::Int(1),
5327 Value::Int(42), Value::Float(0.0),
5329 ]))
5330 .unwrap_err();
5331 match err {
5332 StorageError::TypeMismatch {
5333 ref column,
5334 expected,
5335 actual,
5336 position,
5337 } => {
5338 assert_eq!(column, "name");
5339 assert_eq!(expected, DataType::Text);
5340 assert_eq!(actual, DataType::Int);
5341 assert_eq!(position, 1);
5342 }
5343 other => panic!("unexpected: {other:?}"),
5344 }
5345 assert_eq!(t.row_count(), 0);
5346 }
5347
5348 #[test]
5349 fn table_insert_null_into_not_null_rejected() {
5350 let mut cat = Catalog::new();
5351 cat.create_table(make_users_schema()).unwrap();
5352 let t = cat.get_mut("users").unwrap();
5353 let err = t
5354 .insert(Row::new(vec![
5355 Value::Int(1),
5356 Value::Null, Value::Float(1.0),
5358 ]))
5359 .unwrap_err();
5360 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5361 }
5362
5363 #[test]
5364 fn table_insert_null_into_nullable_ok() {
5365 let mut cat = Catalog::new();
5366 cat.create_table(make_users_schema()).unwrap();
5367 let t = cat.get_mut("users").unwrap();
5368 t.insert(Row::new(vec![
5369 Value::Int(1),
5370 Value::Text("bob".into()),
5371 Value::Null,
5372 ]))
5373 .unwrap();
5374 assert_eq!(t.row_count(), 1);
5375 }
5376
5377 #[test]
5378 fn catalog_get_mut_independent_per_table() {
5379 let mut cat = Catalog::new();
5380 cat.create_table(TableSchema::new(
5381 "a",
5382 vec![ColumnSchema::new("v", DataType::Int, false)],
5383 ))
5384 .unwrap();
5385 cat.create_table(TableSchema::new(
5386 "b",
5387 vec![ColumnSchema::new("v", DataType::Int, false)],
5388 ))
5389 .unwrap();
5390 cat.get_mut("a")
5391 .unwrap()
5392 .insert(Row::new(vec![Value::Int(1)]))
5393 .unwrap();
5394 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5395 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5396 }
5397
5398 fn assert_round_trip(cat: &Catalog) {
5401 let bytes = cat.serialize();
5402 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5403 assert_eq!(restored.table_count(), cat.table_count());
5406 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5407 assert_eq!(a.schema, b.schema);
5408 assert_eq!(a.rows, b.rows);
5409 }
5410 }
5411
5412 #[test]
5413 fn serialize_empty_catalog_round_trips() {
5414 assert_round_trip(&Catalog::new());
5415 }
5416
5417 #[test]
5418 fn serialize_single_empty_table_round_trips() {
5419 let mut cat = Catalog::new();
5420 cat.create_table(make_users_schema()).unwrap();
5421 assert_round_trip(&cat);
5422 }
5423
5424 #[test]
5425 fn nsw_clone_is_o1() {
5426 let mut cat = Catalog::new();
5435 cat.create_table(TableSchema::new(
5436 "docs",
5437 alloc::vec![
5438 ColumnSchema::new("id", DataType::Int, false),
5439 ColumnSchema::new(
5440 "v",
5441 DataType::Vector {
5442 dim: 3,
5443 encoding: VecEncoding::F32
5444 },
5445 true
5446 ),
5447 ],
5448 ))
5449 .unwrap();
5450 let t = cat.get_mut("docs").unwrap();
5451 for i in 0..1500_i32 {
5452 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5454 t.insert(Row::new(alloc::vec![
5455 Value::Int(i),
5456 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5457 ]))
5458 .unwrap();
5459 }
5460 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5461 .unwrap();
5462 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5463 IndexKind::Nsw(g) => g,
5464 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5465 };
5466 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5469 assert!(
5470 g.layers.len() >= 2,
5471 "1500 nodes should populate at least two HNSW layers, got {}",
5472 g.layers.len()
5473 );
5474
5475 let cloned = g.clone();
5476
5477 assert!(
5478 g.levels.shares_storage_with(&cloned.levels),
5479 "levels PV not shared after clone — clone copied elements (O(N))"
5480 );
5481 assert_eq!(g.layers.len(), cloned.layers.len());
5482 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5483 assert!(
5484 orig.shares_storage_with(cl),
5485 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5486 );
5487 }
5488 }
5489
5490 #[test]
5491 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5492 let mut cat = Catalog::new();
5499 cat.create_table(TableSchema::new(
5500 "vecs",
5501 alloc::vec![
5502 ColumnSchema::new("id", DataType::Int, false),
5503 ColumnSchema::new(
5504 "v",
5505 DataType::Vector {
5506 dim: 8,
5507 encoding: VecEncoding::Sq8,
5508 },
5509 false,
5510 ),
5511 ],
5512 ))
5513 .unwrap();
5514 let t = cat.get_mut("vecs").unwrap();
5515 for i in 0..32_i32 {
5516 #[allow(clippy::cast_precision_loss)]
5517 let base = (i as f32) * 0.03;
5518 let v: Vec<f32> = (0..8_i32)
5519 .map(|j| {
5520 #[allow(clippy::cast_precision_loss)]
5521 let off = (j as f32) * 0.01;
5522 base + off
5523 })
5524 .collect();
5525 t.insert(Row::new(alloc::vec![
5526 Value::Int(i),
5527 Value::Sq8Vector(quantize::quantize(&v)),
5528 ]))
5529 .unwrap();
5530 }
5531 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5532 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5535 let (before_cell, before_ty, before_hits) = {
5536 let t_ref = cat.get("vecs").unwrap();
5537 (
5538 t_ref.rows()[5].values[1].clone(),
5539 t_ref.schema().columns[1].ty,
5540 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5541 )
5542 };
5543
5544 let bytes = cat.serialize();
5545 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5546 let rt = restored.get("vecs").unwrap();
5547 assert_eq!(rt.schema().columns[1].ty, before_ty);
5548 assert_eq!(rt.rows()[5].values[1], before_cell);
5549 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5550 assert_eq!(before_hits, after_hits);
5551 }
5552
5553 #[test]
5554 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5555 use crate::halfvec;
5562 let mut cat = Catalog::new();
5563 cat.create_table(TableSchema::new(
5564 "vecs",
5565 alloc::vec![
5566 ColumnSchema::new("id", DataType::Int, false),
5567 ColumnSchema::new(
5568 "v",
5569 DataType::Vector {
5570 dim: 8,
5571 encoding: VecEncoding::F16,
5572 },
5573 false,
5574 ),
5575 ],
5576 ))
5577 .unwrap();
5578 let t = cat.get_mut("vecs").unwrap();
5579 for i in 0..32_i32 {
5580 #[allow(clippy::cast_precision_loss)]
5581 let base = (i as f32) * 0.03;
5582 let v: Vec<f32> = (0..8_i32)
5583 .map(|j| {
5584 #[allow(clippy::cast_precision_loss)]
5585 let off = (j as f32) * 0.01;
5586 base + off
5587 })
5588 .collect();
5589 t.insert(Row::new(alloc::vec![
5590 Value::Int(i),
5591 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
5592 ]))
5593 .unwrap();
5594 }
5595 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5596 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5597 let (before_cell, before_ty, before_hits) = {
5598 let t_ref = cat.get("vecs").unwrap();
5599 (
5600 t_ref.rows()[5].values[1].clone(),
5601 t_ref.schema().columns[1].ty,
5602 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5603 )
5604 };
5605 let bytes = cat.serialize();
5606 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5607 let rt = restored.get("vecs").unwrap();
5608 assert_eq!(rt.schema().columns[1].ty, before_ty);
5609 assert_eq!(rt.rows()[5].values[1], before_cell);
5610 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5611 assert_eq!(before_hits, after_hits);
5612 }
5613
5614 #[test]
5615 #[allow(clippy::similar_names)]
5616 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
5617 use crate::halfvec;
5624 fn next(state: &mut u64) -> f32 {
5625 *state = state
5626 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5627 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5628 #[allow(clippy::cast_precision_loss)]
5629 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5630 2.0 * u - 1.0
5631 }
5632 let dim: u32 = 32;
5633 let n: usize = 512;
5634 let dim_us = dim as usize;
5635 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
5636 let corpus: Vec<Vec<f32>> = (0..n)
5637 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5638 .collect();
5639 let queries: Vec<Vec<f32>> = (0..32)
5640 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5641 .collect();
5642 let exact_top10: Vec<Vec<usize>> = queries
5643 .iter()
5644 .map(|q| {
5645 let mut scored: Vec<(f32, usize)> = corpus
5646 .iter()
5647 .enumerate()
5648 .map(|(i, v)| (l2_distance_sq(v, q), i))
5649 .collect();
5650 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5651 scored.into_iter().take(10).map(|(_, i)| i).collect()
5652 })
5653 .collect();
5654 let mut cat = Catalog::new();
5655 cat.create_table(TableSchema::new(
5656 "vecs",
5657 alloc::vec![
5658 ColumnSchema::new("id", DataType::Int, false),
5659 ColumnSchema::new(
5660 "v",
5661 DataType::Vector {
5662 dim,
5663 encoding: VecEncoding::F16,
5664 },
5665 false,
5666 ),
5667 ],
5668 ))
5669 .unwrap();
5670 let t = cat.get_mut("vecs").unwrap();
5671 for (i, v) in corpus.iter().enumerate() {
5672 t.insert(Row::new(alloc::vec![
5673 Value::Int(i32::try_from(i).unwrap()),
5674 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
5675 ]))
5676 .unwrap();
5677 }
5678 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5679 let table = cat.get("vecs").unwrap();
5680 let mut total_overlap = 0_usize;
5681 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5682 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5683 for h in &hits {
5684 if exact.contains(h) {
5685 total_overlap += 1;
5686 }
5687 }
5688 }
5689 #[allow(clippy::cast_precision_loss)]
5690 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5691 assert!(
5692 recall >= 0.95,
5693 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5694 check halfvec dispatch in `cell_to_query_metric_distance`"
5695 );
5696 }
5697
5698 #[test]
5699 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
5700 use crate::quantize;
5707 fn next(state: &mut u64) -> f32 {
5711 *state = state
5712 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5713 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5714 #[allow(clippy::cast_precision_loss)]
5715 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5716 2.0 * u - 1.0
5717 }
5718 let dim: u32 = 32;
5719 let n: usize = 512;
5720 let dim_us = dim as usize;
5721 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
5722 let corpus: Vec<Vec<f32>> = (0..n)
5723 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5724 .collect();
5725 let queries: Vec<Vec<f32>> = (0..32)
5726 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5727 .collect();
5728 let exact_top10: Vec<Vec<usize>> = queries
5730 .iter()
5731 .map(|q| {
5732 let mut scored: Vec<(f32, usize)> = corpus
5733 .iter()
5734 .enumerate()
5735 .map(|(i, v)| (l2_distance_sq(v, q), i))
5736 .collect();
5737 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5738 scored.into_iter().take(10).map(|(_, i)| i).collect()
5739 })
5740 .collect();
5741 let mut cat = Catalog::new();
5744 cat.create_table(TableSchema::new(
5745 "vecs",
5746 alloc::vec![
5747 ColumnSchema::new("id", DataType::Int, false),
5748 ColumnSchema::new(
5749 "v",
5750 DataType::Vector {
5751 dim,
5752 encoding: VecEncoding::Sq8,
5753 },
5754 false,
5755 ),
5756 ],
5757 ))
5758 .unwrap();
5759 let t = cat.get_mut("vecs").unwrap();
5760 for (i, v) in corpus.iter().enumerate() {
5761 t.insert(Row::new(alloc::vec![
5762 Value::Int(i32::try_from(i).unwrap()),
5763 Value::Sq8Vector(quantize::quantize(v)),
5764 ]))
5765 .unwrap();
5766 }
5767 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5768 let table = cat.get("vecs").unwrap();
5769 let mut total_overlap = 0_usize;
5770 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5771 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5772 for h in &hits {
5773 if exact.contains(h) {
5774 total_overlap += 1;
5775 }
5776 }
5777 }
5778 #[allow(clippy::cast_precision_loss)]
5779 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5780 assert!(
5781 recall >= 0.95,
5782 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5783 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
5784 );
5785 }
5786
5787 #[test]
5788 fn nsw_index_topology_persists_through_round_trip() {
5789 let mut cat = Catalog::new();
5795 cat.create_table(TableSchema::new(
5796 "docs",
5797 alloc::vec![
5798 ColumnSchema::new("id", DataType::Int, false),
5799 ColumnSchema::new(
5800 "v",
5801 DataType::Vector {
5802 dim: 3,
5803 encoding: VecEncoding::F32
5804 },
5805 true
5806 ),
5807 ],
5808 ))
5809 .unwrap();
5810 let t = cat.get_mut("docs").unwrap();
5811 for i in 0..6_i32 {
5812 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
5814 let row = Row::new(alloc::vec![
5815 Value::Int(i),
5816 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5817 ]);
5818 t.insert(row).unwrap();
5819 }
5820 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5821 .unwrap();
5822 let original = match &cat.get("docs").unwrap().indices()[0].kind {
5823 IndexKind::Nsw(g) => g.clone(),
5824 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5825 };
5826 let bytes = cat.serialize();
5827 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5828 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
5829 IndexKind::Nsw(g) => g.clone(),
5830 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5831 };
5832 assert_eq!(restored_graph.m, original.m);
5833 assert_eq!(restored_graph.m_max_0, original.m_max_0);
5834 assert_eq!(restored_graph.entry, original.entry);
5835 assert_eq!(restored_graph.entry_level, original.entry_level);
5836 assert_eq!(restored_graph.levels, original.levels);
5837 assert_eq!(restored_graph.layers, original.layers);
5838 }
5839
5840 #[test]
5841 fn hnsw_level_assignment_is_deterministic() {
5842 for i in 0..32usize {
5845 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
5846 }
5847 }
5848
5849 #[test]
5850 fn hnsw_layer_0_dominates_population() {
5851 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
5856 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
5857 }
5858
5859 #[test]
5860 fn hnsw_search_matches_brute_force_for_l2_top1() {
5861 let mut cat = Catalog::new();
5865 cat.create_table(TableSchema::new(
5866 "vecs",
5867 alloc::vec![
5868 ColumnSchema::new("id", DataType::Int, false),
5869 ColumnSchema::new(
5870 "v",
5871 DataType::Vector {
5872 dim: 3,
5873 encoding: VecEncoding::F32
5874 },
5875 true
5876 ),
5877 ],
5878 ))
5879 .unwrap();
5880 let t = cat.get_mut("vecs").unwrap();
5881 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
5882 (1, [0.0, 0.0, 0.0]),
5883 (2, [1.0, 0.0, 0.0]),
5884 (3, [0.0, 1.0, 0.0]),
5885 (4, [0.0, 0.0, 1.0]),
5886 (5, [1.0, 1.0, 0.0]),
5887 (6, [1.0, 0.0, 1.0]),
5888 (7, [0.0, 1.0, 1.0]),
5889 (8, [1.0, 1.0, 1.0]),
5890 (9, [0.5, 0.5, 0.5]),
5891 (10, [0.2, 0.8, 0.5]),
5892 ];
5893 for &(id, v) in &dataset {
5894 t.insert(Row::new(alloc::vec![
5895 Value::Int(id),
5896 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
5897 ]))
5898 .unwrap();
5899 }
5900 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5901 let idx_pos = cat
5902 .get("vecs")
5903 .unwrap()
5904 .indices()
5905 .iter()
5906 .position(|i| i.name == "v_idx")
5907 .unwrap();
5908 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
5909 let table = cat.get("vecs").unwrap();
5910 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
5911 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
5912 .map(|i| {
5913 let Value::Vector(v) = &table.rows[i].values[1] else {
5914 return (f32::INFINITY, i);
5915 };
5916 (l2_distance_sq(v, &query), i)
5917 })
5918 .collect();
5919 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5920 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
5921 assert_eq!(
5922 hnsw_top[0].1, brute[0].1,
5923 "HNSW top-1 != brute-force top-1 for {query:?}"
5924 );
5925 }
5926 }
5927
5928 #[test]
5929 fn serialize_table_with_rows_round_trips() {
5930 let mut cat = Catalog::new();
5931 cat.create_table(make_users_schema()).unwrap();
5932 let t = cat.get_mut("users").unwrap();
5933 t.insert(Row::new(vec![
5934 Value::Int(1),
5935 Value::Text("alice".into()),
5936 Value::Float(95.5),
5937 ]))
5938 .unwrap();
5939 t.insert(Row::new(vec![
5940 Value::Int(2),
5941 Value::Text("bob".into()),
5942 Value::Null,
5943 ]))
5944 .unwrap();
5945 assert_round_trip(&cat);
5946 }
5947
5948 #[test]
5949 fn serialize_multiple_tables_round_trips() {
5950 let mut cat = Catalog::new();
5951 cat.create_table(make_users_schema()).unwrap();
5952 cat.create_table(TableSchema::new(
5953 "flags",
5954 vec![
5955 ColumnSchema::new("id", DataType::BigInt, false),
5956 ColumnSchema::new("active", DataType::Bool, false),
5957 ],
5958 ))
5959 .unwrap();
5960 cat.get_mut("flags")
5961 .unwrap()
5962 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
5963 .unwrap();
5964 assert_round_trip(&cat);
5965 }
5966
5967 #[test]
5968 fn deserialize_rejects_bad_magic() {
5969 let mut buf = b"BADMAGIC".to_vec();
5970 buf.push(FILE_VERSION);
5971 buf.extend_from_slice(&0u32.to_le_bytes());
5972 let err = Catalog::deserialize(&buf).unwrap_err();
5973 assert!(matches!(err, StorageError::Corrupt(_)));
5974 }
5975
5976 #[test]
5977 fn deserialize_rejects_unsupported_version() {
5978 let mut buf = FILE_MAGIC.to_vec();
5979 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
5981 let err = Catalog::deserialize(&buf).unwrap_err();
5982 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
5983 }
5984
5985 #[test]
5986 fn deserialize_rejects_truncated_file() {
5987 let mut cat = Catalog::new();
5988 cat.create_table(make_users_schema()).unwrap();
5989 let bytes = cat.serialize();
5990 let truncated = &bytes[..bytes.len() - 1];
5992 assert!(matches!(
5993 Catalog::deserialize(truncated),
5994 Err(StorageError::Corrupt(_))
5995 ));
5996 }
5997
5998 #[test]
5999 fn deserialize_rejects_trailing_garbage() {
6000 let cat = Catalog::new();
6001 let mut bytes = cat.serialize();
6002 bytes.push(0xFF);
6003 assert!(matches!(
6004 Catalog::deserialize(&bytes),
6005 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6006 ));
6007 }
6008
6009 fn populated_users() -> Catalog {
6012 let mut cat = Catalog::new();
6013 cat.create_table(make_users_schema()).unwrap();
6014 let t = cat.get_mut("users").unwrap();
6015 for (id, name, score) in [
6016 (1, "alice", Some(90.0)),
6017 (2, "bob", None),
6018 (3, "alice", Some(70.0)), ] {
6020 t.insert(Row::new(vec![
6021 Value::Int(id),
6022 Value::Text(name.into()),
6023 score.map_or(Value::Null, Value::Float),
6024 ]))
6025 .unwrap();
6026 }
6027 cat
6028 }
6029
6030 #[test]
6031 fn add_index_builds_from_existing_rows() {
6032 let mut cat = populated_users();
6033 cat.get_mut("users")
6034 .unwrap()
6035 .add_index("by_id".into(), "id")
6036 .unwrap();
6037 let t = cat.get("users").unwrap();
6038 let idx = t.index_on(0).expect("index_on(0)");
6039 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6040 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6041 }
6042
6043 #[test]
6044 fn add_index_dup_name_rejected() {
6045 let mut cat = populated_users();
6046 let t = cat.get_mut("users").unwrap();
6047 t.add_index("ix".into(), "id").unwrap();
6048 let err = t.add_index("ix".into(), "name").unwrap_err();
6049 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6050 }
6051
6052 #[test]
6053 fn add_index_unknown_column_rejected() {
6054 let mut cat = populated_users();
6055 let err = cat
6056 .get_mut("users")
6057 .unwrap()
6058 .add_index("ix".into(), "ghost")
6059 .unwrap_err();
6060 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6061 }
6062
6063 #[test]
6064 fn insert_after_create_index_updates_it() {
6065 let mut cat = populated_users();
6066 let t = cat.get_mut("users").unwrap();
6067 t.add_index("by_name".into(), "name").unwrap();
6068 t.insert(Row::new(vec![
6069 Value::Int(4),
6070 Value::Text("dave".into()),
6071 Value::Null,
6072 ]))
6073 .unwrap();
6074 let idx = t.index_on(1).unwrap();
6075 assert_eq!(
6076 idx.lookup_eq(&IndexKey::Text("dave".into())),
6077 &[RowLocator::Hot(3)]
6078 );
6079 assert_eq!(
6081 idx.lookup_eq(&IndexKey::Text("alice".into())),
6082 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6083 );
6084 }
6085
6086 #[test]
6087 fn null_or_float_values_are_not_indexed() {
6088 let mut cat = populated_users();
6089 let t = cat.get_mut("users").unwrap();
6090 t.add_index("by_score".into(), "score").unwrap();
6091 let idx = t.index_on(2).unwrap();
6092 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6097 }
6098
6099 #[test]
6102 fn vector_value_data_type_carries_dim() {
6103 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6104 assert_eq!(
6105 v.data_type(),
6106 Some(DataType::Vector {
6107 dim: 3,
6108 encoding: VecEncoding::F32
6109 })
6110 );
6111 }
6112
6113 #[test]
6114 fn vector_column_insert_matching_dim_ok() {
6115 let mut cat = Catalog::new();
6116 cat.create_table(TableSchema::new(
6117 "emb",
6118 vec![ColumnSchema::new(
6119 "v",
6120 DataType::Vector {
6121 dim: 3,
6122 encoding: VecEncoding::F32,
6123 },
6124 false,
6125 )],
6126 ))
6127 .unwrap();
6128 cat.get_mut("emb")
6129 .unwrap()
6130 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6131 .unwrap();
6132 }
6133
6134 #[test]
6135 fn vector_column_insert_dim_mismatch_rejected() {
6136 let mut cat = Catalog::new();
6137 cat.create_table(TableSchema::new(
6138 "emb",
6139 vec![ColumnSchema::new(
6140 "v",
6141 DataType::Vector {
6142 dim: 3,
6143 encoding: VecEncoding::F32,
6144 },
6145 false,
6146 )],
6147 ))
6148 .unwrap();
6149 let err = cat
6150 .get_mut("emb")
6151 .unwrap()
6152 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6153 .unwrap_err();
6154 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6155 }
6156
6157 #[test]
6158 fn vector_value_survives_catalog_round_trip() {
6159 let mut cat = Catalog::new();
6160 cat.create_table(TableSchema::new(
6161 "emb",
6162 vec![
6163 ColumnSchema::new("id", DataType::Int, false),
6164 ColumnSchema::new(
6165 "v",
6166 DataType::Vector {
6167 dim: 4,
6168 encoding: VecEncoding::F32,
6169 },
6170 false,
6171 ),
6172 ],
6173 ))
6174 .unwrap();
6175 cat.get_mut("emb")
6176 .unwrap()
6177 .insert(Row::new(vec![
6178 Value::Int(1),
6179 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6180 ]))
6181 .unwrap();
6182 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6183 let table = restored.get("emb").unwrap();
6184 assert_eq!(
6185 table.schema().columns[1].ty,
6186 DataType::Vector {
6187 dim: 4,
6188 encoding: VecEncoding::F32
6189 }
6190 );
6191 assert_eq!(
6192 table.rows()[0].values[1],
6193 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6194 );
6195 }
6196
6197 #[test]
6198 fn index_survives_serialize_deserialize_round_trip() {
6199 let mut cat = populated_users();
6200 cat.get_mut("users")
6201 .unwrap()
6202 .add_index("by_name".into(), "name")
6203 .unwrap();
6204 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6205 let idx = restored
6206 .get("users")
6207 .unwrap()
6208 .index_on(1)
6209 .expect("index_on(1) after restore");
6210 assert_eq!(idx.name, "by_name");
6211 assert_eq!(
6213 idx.lookup_eq(&IndexKey::Text("alice".into())),
6214 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6215 );
6216 }
6217
6218 fn bigint_pk_users_schema() -> TableSchema {
6223 TableSchema::new(
6224 "users",
6225 vec![
6226 ColumnSchema::new("id", DataType::BigInt, false),
6227 ColumnSchema::new("name", DataType::Text, false),
6228 ],
6229 )
6230 }
6231
6232 fn make_user_row(id: i64, name: &str) -> Row {
6233 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6234 }
6235
6236 #[test]
6237 fn lookup_by_pk_finds_row_via_hot_index() {
6238 let mut cat = Catalog::new();
6239 cat.create_table(bigint_pk_users_schema()).unwrap();
6240 let t = cat.get_mut("users").unwrap();
6241 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6242 t.insert(make_user_row(id, name)).unwrap();
6243 }
6244 t.add_index("by_id".into(), "id").unwrap();
6245 let got = cat
6247 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6248 .unwrap();
6249 assert_eq!(got, make_user_row(2, "bob"));
6250 assert_eq!(cat.cold_segment_count(), 0);
6251 }
6252
6253 #[test]
6254 fn lookup_by_pk_returns_none_when_key_missing() {
6255 let mut cat = Catalog::new();
6256 cat.create_table(bigint_pk_users_schema()).unwrap();
6257 let t = cat.get_mut("users").unwrap();
6258 t.insert(make_user_row(1, "alice")).unwrap();
6259 t.add_index("by_id".into(), "id").unwrap();
6260 assert!(
6261 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6262 .is_none()
6263 );
6264 assert!(
6266 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6267 .is_none()
6268 );
6269 assert!(
6270 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6271 .is_none()
6272 );
6273 }
6274
6275 #[test]
6276 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6277 let mut cat = Catalog::new();
6281 cat.create_table(bigint_pk_users_schema()).unwrap();
6282 let t = cat.get_mut("users").unwrap();
6283 t.add_index("by_id".into(), "id").unwrap();
6284 let schema = t.schema.clone();
6285
6286 let cold_rows: Vec<(i64, &str)> =
6287 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6288 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6289 .iter()
6290 .map(|(id, name)| {
6291 let row = make_user_row(*id, name);
6292 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6293 })
6294 .collect();
6295 let (seg_bytes, _meta) =
6296 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6297 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6298 assert_eq!(seg_id, 0);
6299 assert_eq!(cat.cold_segment_count(), 1);
6300
6301 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6302 .iter()
6303 .map(|(id, _)| {
6304 (
6305 IndexKey::Int(*id),
6306 RowLocator::Cold {
6307 segment_id: seg_id,
6308 page_offset: 0,
6309 },
6310 )
6311 })
6312 .collect();
6313 let registered = cat
6314 .get_mut("users")
6315 .unwrap()
6316 .register_cold_locators("by_id", pairs)
6317 .unwrap();
6318 assert_eq!(registered, 4);
6319
6320 for (id, name) in &cold_rows {
6321 let got = cat
6322 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6323 .unwrap_or_else(|| panic!("cold key {id} not found"));
6324 assert_eq!(got, make_user_row(*id, name));
6325 }
6326 assert!(
6328 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6329 .is_none()
6330 );
6331 }
6332
6333 #[test]
6334 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6335 let mut cat = Catalog::new();
6339 cat.create_table(bigint_pk_users_schema()).unwrap();
6340 let t = cat.get_mut("users").unwrap();
6341 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6342 t.insert(make_user_row(id, name)).unwrap();
6343 }
6344 t.add_index("by_id".into(), "id").unwrap();
6345 let schema = t.schema.clone();
6346
6347 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6348 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6349 .iter()
6350 .map(|(id, name)| {
6351 let row = make_user_row(*id, name);
6352 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6353 })
6354 .collect();
6355 let (seg_bytes, _) =
6356 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6357 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6358 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6359 .iter()
6360 .map(|(id, _)| {
6361 (
6362 IndexKey::Int(*id),
6363 RowLocator::Cold {
6364 segment_id: seg_id,
6365 page_offset: 0,
6366 },
6367 )
6368 })
6369 .collect();
6370 cat.get_mut("users")
6371 .unwrap()
6372 .register_cold_locators("by_id", pairs)
6373 .unwrap();
6374
6375 assert_eq!(
6377 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6378 .unwrap(),
6379 make_user_row(1, "alice")
6380 );
6381 assert_eq!(
6382 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6383 .unwrap(),
6384 make_user_row(2, "bob")
6385 );
6386 assert_eq!(
6388 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6389 .unwrap(),
6390 make_user_row(100, "ivy")
6391 );
6392 assert_eq!(
6393 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6394 .unwrap(),
6395 make_user_row(200, "joe")
6396 );
6397 assert!(
6399 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6400 .is_none()
6401 );
6402 }
6403
6404 #[test]
6405 fn register_cold_locators_rejects_nsw_index() {
6406 let mut cat = Catalog::new();
6407 cat.create_table(TableSchema::new(
6408 "vecs",
6409 vec![
6410 ColumnSchema::new("id", DataType::Int, false),
6411 ColumnSchema::new(
6412 "v",
6413 DataType::Vector {
6414 dim: 4,
6415 encoding: VecEncoding::F32,
6416 },
6417 false,
6418 ),
6419 ],
6420 ))
6421 .unwrap();
6422 let t = cat.get_mut("vecs").unwrap();
6423 t.insert(Row::new(vec![
6424 Value::Int(1),
6425 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6426 ]))
6427 .unwrap();
6428 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6429 let err = t
6430 .register_cold_locators(
6431 "by_v",
6432 vec![(
6433 IndexKey::Int(1),
6434 RowLocator::Cold {
6435 segment_id: 0,
6436 page_offset: 0,
6437 },
6438 )],
6439 )
6440 .unwrap_err();
6441 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6444 }
6445
6446 #[test]
6447 fn load_segment_bytes_rejects_garbage() {
6448 let mut cat = Catalog::new();
6449 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6450 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6451 assert_eq!(cat.cold_segment_count(), 0);
6453 }
6454
6455 #[test]
6456 fn load_segment_bytes_returns_sequential_ids() {
6457 let mut cat = Catalog::new();
6458 cat.create_table(bigint_pk_users_schema()).unwrap();
6459 let schema = cat.get("users").unwrap().schema.clone();
6460 for batch in 0u32..3 {
6461 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6462 .map(|i| {
6463 let id = u64::from(batch) * 100 + i;
6464 let row = make_user_row(id.cast_signed(), "x");
6465 (id, encode_row_body_dense(&row, &schema))
6466 })
6467 .collect();
6468 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6469 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6470 }
6471 assert_eq!(cat.cold_segment_count(), 3);
6472 }
6473
6474 #[test]
6481 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6482 let mut cat = populated_users();
6489 cat.get_mut("users")
6490 .unwrap()
6491 .add_index("by_name".into(), "name")
6492 .unwrap();
6493
6494 let v8_bytes = encode_as_v8(&cat);
6499 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6500
6501 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6502 let idx = restored
6503 .get("users")
6504 .unwrap()
6505 .index_on(1)
6506 .expect("index_on(1) after restore");
6507 assert_eq!(
6510 idx.lookup_eq(&IndexKey::Text("alice".into())),
6511 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6512 );
6513 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6515 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6516 }
6517 }
6518
6519 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6524 let mut out = Vec::with_capacity(64);
6525 out.extend_from_slice(FILE_MAGIC);
6526 out.push(8u8);
6527 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6528 for t in &cat.tables {
6529 write_str(&mut out, &t.schema.name);
6530 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6531 for c in &t.schema.columns {
6532 write_str(&mut out, &c.name);
6533 write_data_type(&mut out, c.ty);
6534 out.push(u8::from(c.nullable));
6535 match &c.default {
6536 None => out.push(0),
6537 Some(v) => {
6538 out.push(1);
6539 write_value(&mut out, v);
6540 }
6541 }
6542 out.push(u8::from(c.auto_increment));
6543 }
6544 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6545 for row in &t.rows {
6546 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6547 }
6548 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6549 for idx in &t.indices {
6550 write_str(&mut out, &idx.name);
6551 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6552 match &idx.kind {
6553 IndexKind::BTree(_) => out.push(0),
6556 IndexKind::Nsw(g) => {
6557 out.push(1);
6558 write_u16(&mut out, u16::try_from(g.m).unwrap());
6559 write_nsw_graph(&mut out, g);
6560 }
6561 IndexKind::Brin { .. } => panic!(
6564 "v8 catalog writer cannot serialise BRIN — \
6565 tests with BRIN indices must use the current writer"
6566 ),
6567 }
6568 }
6569 }
6570 out
6571 }
6572
6573 #[test]
6579 fn v9_catalog_round_trip_preserves_cold_locators() {
6580 let mut cat = Catalog::new();
6581 cat.create_table(bigint_pk_users_schema()).unwrap();
6582 let t = cat.get_mut("users").unwrap();
6583 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6585 t.insert(make_user_row(id, name)).unwrap();
6586 }
6587 t.add_index("by_id".into(), "id").unwrap();
6588 let schema = t.schema.clone();
6589
6590 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
6592 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6593 .iter()
6594 .map(|(id, name)| {
6595 let row = make_user_row(*id, name);
6596 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6597 })
6598 .collect();
6599 let (seg_bytes, _) =
6600 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6601 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
6602 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6603 .iter()
6604 .map(|(id, _)| {
6605 (
6606 IndexKey::Int(*id),
6607 RowLocator::Cold {
6608 segment_id: seg_id,
6609 page_offset: 0,
6610 },
6611 )
6612 })
6613 .collect();
6614 cat.get_mut("users")
6615 .unwrap()
6616 .register_cold_locators("by_id", pairs)
6617 .unwrap();
6618
6619 let bytes = cat.serialize();
6621 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
6622 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
6623
6624 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
6631 assert_eq!(restored_seg_id, seg_id);
6632
6633 let idx = restored.get("users").unwrap().index_on(0).unwrap();
6634 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
6636 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6637 for (id, _) in &cold_rows {
6639 assert_eq!(
6640 idx.lookup_eq(&IndexKey::Int(*id)),
6641 &[RowLocator::Cold {
6642 segment_id: seg_id,
6643 page_offset: 0,
6644 }]
6645 );
6646 }
6647 assert_eq!(
6649 restored
6650 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6651 .unwrap(),
6652 make_user_row(2, "bob")
6653 );
6654 for (id, name) in &cold_rows {
6655 assert_eq!(
6656 restored
6657 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6658 .unwrap(),
6659 make_user_row(*id, name)
6660 );
6661 }
6662 }
6663
6664 #[test]
6671 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
6672 let schema = TableSchema::new(
6673 "wide",
6674 vec![
6675 ColumnSchema::new("a", DataType::SmallInt, true),
6676 ColumnSchema::new("b", DataType::Int, false),
6677 ColumnSchema::new("c", DataType::BigInt, false),
6678 ColumnSchema::new("d", DataType::Float, false),
6679 ColumnSchema::new("e", DataType::Bool, false),
6680 ColumnSchema::new("f", DataType::Text, false),
6681 ColumnSchema::new(
6682 "g",
6683 DataType::Vector {
6684 dim: 3,
6685 encoding: VecEncoding::F32,
6686 },
6687 false,
6688 ),
6689 ColumnSchema::new(
6690 "h",
6691 DataType::Numeric {
6692 precision: 18,
6693 scale: 2,
6694 },
6695 false,
6696 ),
6697 ColumnSchema::new("i", DataType::Date, false),
6698 ColumnSchema::new("j", DataType::Timestamp, false),
6699 ],
6700 );
6701 let cases: &[Row] = &[
6702 Row::new(vec![
6703 Value::SmallInt(7),
6704 Value::Int(42),
6705 Value::BigInt(1_000_000),
6706 Value::Float(1.5),
6707 Value::Bool(true),
6708 Value::Text("hello".into()),
6709 Value::Vector(vec![1.0, 2.0, 3.0]),
6710 Value::Numeric {
6711 scaled: 12345,
6712 scale: 2,
6713 },
6714 Value::Date(20_000),
6715 Value::Timestamp(1_700_000_000_000_000),
6716 ]),
6717 Row::new(vec![
6719 Value::Null,
6720 Value::Int(0),
6721 Value::BigInt(0),
6722 Value::Float(0.0),
6723 Value::Bool(false),
6724 Value::Text(String::new()),
6725 Value::Vector(vec![]),
6726 Value::Numeric {
6727 scaled: 0,
6728 scale: 2,
6729 },
6730 Value::Date(0),
6731 Value::Timestamp(0),
6732 ]),
6733 Row::new(vec![
6734 Value::SmallInt(-1),
6735 Value::Int(-1),
6736 Value::BigInt(-1),
6737 Value::Float(-0.5),
6738 Value::Bool(true),
6739 Value::Text("a much longer payload here".into()),
6740 Value::Vector(vec![0.1, 0.2, 0.3]),
6741 Value::Numeric {
6742 scaled: -999_999_999,
6743 scale: 2,
6744 },
6745 Value::Date(-1),
6746 Value::Timestamp(-1),
6747 ]),
6748 ];
6749 for row in cases {
6750 let actual = encode_row_body_dense(row, &schema).len();
6751 let fast = row_body_encoded_len(row, &schema);
6752 assert_eq!(actual, fast, "row {row:?}");
6753 }
6754 }
6755
6756 #[test]
6757 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
6758 let mut cat = Catalog::new();
6759 cat.create_table(bigint_pk_users_schema()).unwrap();
6760 let t = cat.get_mut("users").unwrap();
6761 assert_eq!(t.hot_bytes(), 0);
6762 let mut expected: u64 = 0;
6763 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6764 let row = make_user_row(id, name);
6765 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
6766 t.insert(row).unwrap();
6767 }
6768 assert_eq!(t.hot_bytes(), expected);
6769 assert_eq!(cat.hot_tier_bytes(), expected);
6770 }
6771
6772 #[test]
6773 fn hot_bytes_shrinks_on_delete() {
6774 let mut cat = Catalog::new();
6775 cat.create_table(bigint_pk_users_schema()).unwrap();
6776 let t = cat.get_mut("users").unwrap();
6777 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6778 t.insert(make_user_row(id, name)).unwrap();
6779 }
6780 let before = t.hot_bytes();
6781 let bob_row = make_user_row(2, "bob");
6783 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
6784 let removed = t.delete_rows(&[1]);
6785 assert_eq!(removed, 1);
6786 assert_eq!(t.hot_bytes(), before - bob_bytes);
6787 }
6788
6789 #[test]
6790 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
6791 let mut cat = Catalog::new();
6792 cat.create_table(bigint_pk_users_schema()).unwrap();
6793 let t = cat.get_mut("users").unwrap();
6794 t.insert(make_user_row(1, "alice")).unwrap();
6795 let after_insert = t.hot_bytes();
6796 let new_row = make_user_row(1, "alice-the-longer-name");
6799 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
6800 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
6801 t.update_row(0, new_row.values).unwrap();
6802 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
6803 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
6804 }
6805
6806 #[test]
6807 fn hot_bytes_round_trips_through_serialize_deserialize() {
6808 let mut cat = Catalog::new();
6809 cat.create_table(bigint_pk_users_schema()).unwrap();
6810 let t = cat.get_mut("users").unwrap();
6811 for i in 0..10 {
6812 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
6813 .unwrap();
6814 }
6815 let pre = cat.hot_tier_bytes();
6816 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6817 assert_eq!(restored.hot_tier_bytes(), pre);
6818 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
6819 }
6820
6821 #[test]
6828 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
6829 let mut cat = Catalog::new();
6830 cat.create_table(bigint_pk_users_schema()).unwrap();
6831 let t = cat.get_mut("users").unwrap();
6832 for id in 0..10i64 {
6833 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6834 .unwrap();
6835 }
6836 t.add_index("by_id".into(), "id").unwrap();
6837 let total_bytes_before = t.hot_bytes();
6838
6839 let report = cat
6840 .freeze_oldest_to_cold("users", "by_id", 6)
6841 .expect("freeze succeeds");
6842 assert_eq!(report.frozen_rows, 6);
6843 assert_eq!(report.segment_id, 0);
6844 assert!(report.bytes_freed > 0);
6845 assert!(!report.segment_bytes.is_empty());
6846
6847 let t = cat.get("users").unwrap();
6848 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
6849 assert_eq!(cat.cold_segment_count(), 1);
6850 assert_eq!(
6852 t.hot_bytes(),
6853 total_bytes_before - report.bytes_freed,
6854 "hot_bytes accounting matches FreezeReport"
6855 );
6856
6857 for id in 0..10i64 {
6860 let got = cat
6861 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
6862 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
6863 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
6864 }
6865 }
6866
6867 #[test]
6872 fn freeze_twice_preserves_prior_cold_locators() {
6873 let mut cat = Catalog::new();
6874 cat.create_table(bigint_pk_users_schema()).unwrap();
6875 let t = cat.get_mut("users").unwrap();
6876 for id in 0..12i64 {
6877 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6878 .unwrap();
6879 }
6880 t.add_index("by_id".into(), "id").unwrap();
6881
6882 cat.freeze_oldest_to_cold("users", "by_id", 4)
6883 .expect("first freeze ok");
6884 cat.freeze_oldest_to_cold("users", "by_id", 4)
6885 .expect("second freeze ok");
6886
6887 assert_eq!(cat.get("users").unwrap().row_count(), 4);
6888 assert_eq!(cat.cold_segment_count(), 2);
6889 for id in 0..12i64 {
6892 let got = cat
6893 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
6894 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
6895 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
6896 }
6897 }
6898
6899 #[test]
6902 fn freeze_oldest_to_cold_rejects_invalid_input() {
6903 let mut cat = Catalog::new();
6904 cat.create_table(bigint_pk_users_schema()).unwrap();
6905 let t = cat.get_mut("users").unwrap();
6906 for id in 0..3i64 {
6907 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6908 .unwrap();
6909 }
6910 t.add_index("by_id".into(), "id").unwrap();
6911
6912 assert!(matches!(
6914 cat.freeze_oldest_to_cold("users", "by_id", 0),
6915 Err(StorageError::Corrupt(_))
6916 ));
6917 assert!(matches!(
6919 cat.freeze_oldest_to_cold("missing", "by_id", 1),
6920 Err(StorageError::Corrupt(_))
6921 ));
6922 assert!(matches!(
6924 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
6925 Err(StorageError::Corrupt(_))
6926 ));
6927 assert!(matches!(
6929 cat.freeze_oldest_to_cold("users", "by_id", 999),
6930 Err(StorageError::Corrupt(_))
6931 ));
6932 assert_eq!(cat.get("users").unwrap().row_count(), 3);
6934 assert_eq!(cat.cold_segment_count(), 0);
6935 }
6936
6937 #[test]
6940 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
6941 let mut cat = Catalog::new();
6942 cat.create_table(TableSchema::new(
6943 "by_name",
6944 vec![
6945 ColumnSchema::new("name", DataType::Text, false),
6946 ColumnSchema::new("payload", DataType::BigInt, false),
6947 ],
6948 ))
6949 .unwrap();
6950 let t = cat.get_mut("by_name").unwrap();
6951 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
6952 .unwrap();
6953 t.add_index("by_n".into(), "name").unwrap();
6954 let err = cat
6955 .freeze_oldest_to_cold("by_name", "by_n", 1)
6956 .expect_err("non-integer PK rejected");
6957 match err {
6958 StorageError::Corrupt(s) => assert!(
6959 s.contains("non-integer"),
6960 "error message names the constraint: {s}"
6961 ),
6962 other => panic!("expected Corrupt, got {other:?}"),
6963 }
6964 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
6966 assert_eq!(cat.cold_segment_count(), 0);
6967 }
6968
6969 #[test]
6974 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
6975 let mut cat = Catalog::new();
6976 cat.create_table(bigint_pk_users_schema()).unwrap();
6977 let t = cat.get_mut("users").unwrap();
6978 for id in 0..6i64 {
6979 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6980 .unwrap();
6981 }
6982 t.add_index("by_id".into(), "id").unwrap();
6983 t.add_index("by_name".into(), "name").unwrap();
6984
6985 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
6986
6987 let idx = cat.get("users").unwrap().index_on(1).unwrap();
6991 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
6992 assert_eq!(got.len(), 1);
6993 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
6994 match got[0] {
6995 RowLocator::Hot(i) => {
6996 assert_eq!(i, 1);
6999 }
7000 RowLocator::Cold { .. } => unreachable!(),
7001 }
7002 }
7003
7004 #[test]
7012 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7013 let mut cat = Catalog::new();
7014 cat.create_table(bigint_pk_users_schema()).unwrap();
7015 let t = cat.get_mut("users").unwrap();
7016 for id in 0..6i64 {
7017 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7018 .unwrap();
7019 }
7020 t.add_index("by_id".into(), "id").unwrap();
7021 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7024 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7025
7026 let new_idx = cat
7028 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7029 .expect("promote ok")
7030 .expect("PK 2 was cold");
7031 assert_eq!(
7032 new_idx, 2,
7033 "promoted row appended after the 2 surviving hot rows"
7034 );
7035
7036 let t = cat.get("users").unwrap();
7037 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7038 let row = make_user_row(2, "u-2");
7040 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7041 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7042
7043 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7046 assert_eq!(entries.len(), 1, "exactly one locator per key");
7047 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7048 assert_eq!(
7050 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7051 .unwrap(),
7052 row
7053 );
7054 assert_eq!(
7057 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7058 .unwrap(),
7059 make_user_row(0, "u-0")
7060 );
7061 }
7062
7063 #[test]
7067 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7068 let mut cat = Catalog::new();
7069 cat.create_table(bigint_pk_users_schema()).unwrap();
7070 let t = cat.get_mut("users").unwrap();
7071 t.insert(make_user_row(7, "alice")).unwrap();
7072 t.add_index("by_id".into(), "id").unwrap();
7073
7074 assert!(
7076 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7077 .unwrap()
7078 .is_none()
7079 );
7080 assert!(
7082 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7083 .unwrap()
7084 .is_none()
7085 );
7086 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7088 assert_eq!(cat.cold_segment_count(), 0);
7089 }
7090
7091 #[test]
7096 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7097 let mut cat = Catalog::new();
7098 cat.create_table(bigint_pk_users_schema()).unwrap();
7099 let t = cat.get_mut("users").unwrap();
7100 for id in 0..5i64 {
7101 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7102 .unwrap();
7103 }
7104 t.add_index("by_id".into(), "id").unwrap();
7105 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7106
7107 assert!(
7109 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7110 .is_some(),
7111 "frozen PK resolves before shadow"
7112 );
7113 let removed = cat
7114 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7115 .unwrap();
7116 assert_eq!(removed, 1, "exactly one cold locator retired");
7117
7118 assert!(
7121 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7122 .is_none(),
7123 "shadowed key no longer resolves"
7124 );
7125 assert_eq!(
7127 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7128 .unwrap(),
7129 make_user_row(0, "u-0")
7130 );
7131 assert_eq!(
7132 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7133 .unwrap(),
7134 make_user_row(2, "u-2")
7135 );
7136 }
7137
7138 #[test]
7143 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7144 let mut cat = Catalog::new();
7145 cat.create_table(bigint_pk_users_schema()).unwrap();
7146 let t = cat.get_mut("users").unwrap();
7147 t.insert(make_user_row(1, "alice")).unwrap();
7148 t.add_index("by_id".into(), "id").unwrap();
7149 assert_eq!(
7150 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7151 .unwrap(),
7152 0,
7153 "hot-only key drops no cold locators"
7154 );
7155 assert_eq!(
7156 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7157 .unwrap(),
7158 0,
7159 "absent key drops no cold locators"
7160 );
7161 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7162 }
7163
7164 #[test]
7166 fn promote_and_shadow_reject_invalid_inputs() {
7167 let mut cat = Catalog::new();
7168 cat.create_table(bigint_pk_users_schema()).unwrap();
7169 let t = cat.get_mut("users").unwrap();
7170 t.insert(make_user_row(1, "alice")).unwrap();
7171 t.add_index("by_id".into(), "id").unwrap();
7172
7173 assert!(matches!(
7175 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7176 Err(StorageError::Corrupt(_))
7177 ));
7178 assert!(matches!(
7179 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7180 Err(StorageError::Corrupt(_))
7181 ));
7182 assert!(matches!(
7184 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7185 Err(StorageError::Corrupt(_))
7186 ));
7187 assert!(matches!(
7188 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7189 Err(StorageError::Corrupt(_))
7190 ));
7191 }
7192
7193 #[test]
7200 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7201 let mut a = Catalog::new();
7202 let mut b = Catalog::new();
7203 for cat in [&mut a, &mut b] {
7204 cat.create_table(bigint_pk_users_schema()).unwrap();
7205 let t = cat.get_mut("users").unwrap();
7206 for id in 0..10i64 {
7207 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7208 .unwrap();
7209 }
7210 t.add_index("by_id".into(), "id").unwrap();
7211 }
7212 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7213 let slice = b
7214 .prepare_freeze_slice("users", "by_id", 0..6)
7215 .expect("prepare");
7216 let parallel = b
7217 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7218 .expect("commit");
7219 assert_eq!(single.segment_id, parallel.segment_id);
7220 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7221 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7222 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7223 for id in 0..10i64 {
7225 assert_eq!(
7226 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7227 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7228 "PK {id} differs after single vs slice freeze"
7229 );
7230 }
7231 }
7232
7233 #[test]
7238 fn commit_freeze_slices_two_slices_match_single_slice() {
7239 let mut a = Catalog::new();
7240 let mut b = Catalog::new();
7241 for cat in [&mut a, &mut b] {
7242 cat.create_table(bigint_pk_users_schema()).unwrap();
7243 let t = cat.get_mut("users").unwrap();
7244 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7247 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7248 .unwrap();
7249 }
7250 t.add_index("by_id".into(), "id").unwrap();
7251 }
7252 let single = a
7253 .prepare_freeze_slice("users", "by_id", 0..8)
7254 .expect("prepare");
7255 let one = a
7256 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7257 .expect("commit one");
7258 let s1 = b
7259 .prepare_freeze_slice("users", "by_id", 0..4)
7260 .expect("prepare s1");
7261 let s2 = b
7262 .prepare_freeze_slice("users", "by_id", 4..8)
7263 .expect("prepare s2");
7264 let two = b
7265 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7266 .expect("commit two");
7267 assert_eq!(one.segment_bytes, two.segment_bytes);
7268 assert_eq!(one.frozen_rows, two.frozen_rows);
7269 for id in 0..10i64 {
7272 assert_eq!(
7273 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7274 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7275 "PK {id} differs after one-slice vs two-slice freeze"
7276 );
7277 }
7278 }
7279
7280 #[test]
7282 fn commit_freeze_slices_rejects_gap() {
7283 let mut cat = Catalog::new();
7284 cat.create_table(bigint_pk_users_schema()).unwrap();
7285 let t = cat.get_mut("users").unwrap();
7286 for id in 0..6i64 {
7287 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7288 .unwrap();
7289 }
7290 t.add_index("by_id".into(), "id").unwrap();
7291 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7292 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7293 assert!(matches!(
7294 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7295 Err(StorageError::Corrupt(_))
7296 ));
7297 assert_eq!(cat.cold_segment_count(), 0);
7299 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7300 }
7301
7302 #[test]
7304 fn commit_freeze_slices_empty_is_noop() {
7305 let mut cat = Catalog::new();
7306 cat.create_table(bigint_pk_users_schema()).unwrap();
7307 let t = cat.get_mut("users").unwrap();
7308 for id in 0..3i64 {
7309 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7310 .unwrap();
7311 }
7312 t.add_index("by_id".into(), "id").unwrap();
7313 let report = cat
7314 .commit_freeze_slices("users", "by_id", Vec::new())
7315 .unwrap();
7316 assert_eq!(report.frozen_rows, 0);
7317 assert_eq!(cat.cold_segment_count(), 0);
7318 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7319 }
7320
7321 #[test]
7328 fn compact_merges_small_segments_storage_unit() {
7329 let mut cat = Catalog::new();
7330 cat.create_table(bigint_pk_users_schema()).unwrap();
7331 let t = cat.get_mut("users").unwrap();
7332 for id in 0..8i64 {
7333 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7334 .unwrap();
7335 }
7336 t.add_index("by_id".into(), "id").unwrap();
7337 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7339 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7340 assert_eq!(cat.cold_segment_count(), 2);
7341 assert_eq!(cat.cold_segment_slot_count(), 2);
7342
7343 let max_seg_bytes = cat
7346 .cold_segment_ids_global()
7347 .iter()
7348 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7349 .max()
7350 .unwrap();
7351 let target = max_seg_bytes + 1;
7352
7353 let report = cat
7354 .compact_cold_segments("users", "by_id", target)
7355 .expect("compact succeeds");
7356 assert_eq!(report.sources.len(), 2);
7357 let merged_id = report.merged_segment_id.expect("merge happened");
7358 assert_eq!(report.merged_rows, 6);
7359 assert_eq!(report.deleted_rows_pruned, 0);
7360 assert!(!report.merged_segment_bytes.is_empty());
7361
7362 assert_eq!(cat.cold_segment_count(), 1);
7365 assert_eq!(cat.cold_segment_slot_count(), 3);
7366 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7367
7368 for id in 0..8i64 {
7371 let got = cat
7372 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7373 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7374 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7375 }
7376 }
7377
7378 #[test]
7382 fn compact_drops_shadowed_cold_rows() {
7383 let mut cat = Catalog::new();
7384 cat.create_table(bigint_pk_users_schema()).unwrap();
7385 let t = cat.get_mut("users").unwrap();
7386 for id in 0..6i64 {
7387 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7388 .unwrap();
7389 }
7390 t.add_index("by_id".into(), "id").unwrap();
7391 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7392 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7393 assert_eq!(
7395 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7396 .unwrap(),
7397 1
7398 );
7399 assert_eq!(
7400 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7401 .unwrap(),
7402 1
7403 );
7404
7405 let max_seg_bytes = cat
7406 .cold_segment_ids_global()
7407 .iter()
7408 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7409 .max()
7410 .unwrap();
7411 let report = cat
7412 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7413 .expect("compact succeeds");
7414 assert_eq!(report.sources.len(), 2);
7415 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7416 assert_eq!(report.deleted_rows_pruned, 2);
7417
7418 for shadowed in [1i64, 4i64] {
7420 assert!(
7421 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7422 .is_none(),
7423 "shadowed PK {shadowed} must remain invisible after compact"
7424 );
7425 }
7426 for live in [0i64, 2, 3, 5] {
7428 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7429 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7430 }
7431 }
7432
7433 #[test]
7436 fn compact_is_noop_below_two_candidates() {
7437 let mut cat = Catalog::new();
7438 cat.create_table(bigint_pk_users_schema()).unwrap();
7439 let t = cat.get_mut("users").unwrap();
7440 for id in 0..6i64 {
7441 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7442 .unwrap();
7443 }
7444 t.add_index("by_id".into(), "id").unwrap();
7445 let report = cat
7447 .compact_cold_segments("users", "by_id", 1 << 30)
7448 .expect("noop ok");
7449 assert!(report.merged_segment_id.is_none());
7450 assert!(report.sources.is_empty());
7451
7452 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7454 let report = cat
7455 .compact_cold_segments("users", "by_id", 1 << 30)
7456 .expect("noop ok");
7457 assert!(report.merged_segment_id.is_none());
7458 assert_eq!(cat.cold_segment_count(), 1);
7459
7460 let report = cat
7463 .compact_cold_segments("users", "by_id", 1)
7464 .expect("noop ok");
7465 assert!(report.merged_segment_id.is_none());
7466 assert_eq!(cat.cold_segment_count(), 1);
7467 }
7468
7469 #[test]
7477 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7478 let mut cat = Catalog::new();
7479 cat.create_table(bigint_pk_users_schema()).unwrap();
7480 let t = cat.get_mut("users").unwrap();
7481 for id in 0..6i64 {
7482 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7483 .unwrap();
7484 }
7485 t.add_index("by_id".into(), "id").unwrap();
7486 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7487 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7488 let max_seg_bytes = cat
7489 .cold_segment_ids_global()
7490 .iter()
7491 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7492 .max()
7493 .unwrap();
7494 let report = cat
7495 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7496 .expect("compact ok");
7497 let merged_id = report.merged_segment_id.unwrap();
7498
7499 let cat_bytes = cat.serialize();
7504 let merged_bytes = report.merged_segment_bytes.clone();
7505
7506 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7507 restored
7508 .load_segment_bytes_at(merged_id, merged_bytes)
7509 .expect("reload merged ok");
7510
7511 for id in 0..6i64 {
7513 let got = restored
7514 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7515 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7516 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7517 }
7518 assert_eq!(restored.cold_segment_count(), 1);
7521 }
7522
7523 #[test]
7526 fn load_segment_bytes_at_pads_and_rejects_collision() {
7527 let mut cat = Catalog::new();
7528 cat.create_table(bigint_pk_users_schema()).unwrap();
7529 let t = cat.get_mut("users").unwrap();
7530 for id in 0..4i64 {
7531 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7532 .unwrap();
7533 }
7534 t.add_index("by_id".into(), "id").unwrap();
7535 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7536 let bytes_seg0 = report.segment_bytes.clone();
7537
7538 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7542 .expect("pad + load ok");
7543 assert_eq!(cat.cold_segment_slot_count(), 6);
7544 assert_eq!(cat.cold_segment_count(), 2);
7545
7546 assert!(matches!(
7548 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7549 Err(StorageError::Corrupt(_))
7550 ));
7551 assert!(matches!(
7553 cat.load_segment_bytes_at(0, bytes_seg0),
7554 Err(StorageError::Corrupt(_))
7555 ));
7556 }
7557
7558 #[test]
7562 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7563 let mut cat = Catalog::new();
7564 cat.create_table(bigint_pk_users_schema()).unwrap();
7565 let t = cat.get_mut("users").unwrap();
7566 for id in 0..4i64 {
7567 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7568 .unwrap();
7569 }
7570 t.add_index("by_id".into(), "id").unwrap();
7571
7572 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7574 let promoted = cat
7575 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7576 .unwrap();
7577 assert!(promoted.is_some());
7578 let entries_after_promote = cat
7579 .get("users")
7580 .unwrap()
7581 .index_on(0)
7582 .unwrap()
7583 .lookup_eq(&IndexKey::Int(0))
7584 .to_vec();
7585 assert_eq!(entries_after_promote.len(), 1);
7586 assert!(entries_after_promote[0].is_hot());
7587
7588 for id in [2i64, 3] {
7595 assert_eq!(
7596 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7597 .unwrap(),
7598 make_user_row(id, &alloc::format!("u-{id}"))
7599 );
7600 }
7601 }
7602}