1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138}
139
140impl fmt::Display for DataType {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 Self::SmallInt => f.write_str("SMALLINT"),
144 Self::Int => f.write_str("INT"),
145 Self::BigInt => f.write_str("BIGINT"),
146 Self::Float => f.write_str("FLOAT"),
147 Self::Text => f.write_str("TEXT"),
148 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
149 Self::Char(n) => write!(f, "CHAR({n})"),
150 Self::Bool => f.write_str("BOOL"),
151 Self::Vector { dim, encoding } => match encoding {
152 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
153 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
154 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
155 },
156 Self::Numeric { precision, scale } => {
157 if *scale == 0 {
158 write!(f, "NUMERIC({precision})")
159 } else {
160 write!(f, "NUMERIC({precision}, {scale})")
161 }
162 }
163 Self::Date => f.write_str("DATE"),
164 Self::Timestamp => f.write_str("TIMESTAMP"),
165 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
166 Self::Interval => f.write_str("INTERVAL"),
167 Self::Json => f.write_str("JSON"),
168 Self::Jsonb => f.write_str("JSONB"),
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq)]
177#[non_exhaustive]
178pub enum Value {
179 SmallInt(i16),
180 Int(i32),
181 BigInt(i64),
182 Float(f64),
183 Text(String),
184 Bool(bool),
185 Vector(Vec<f32>),
186 Sq8Vector(crate::quantize::Sq8Vector),
193 HalfVector(crate::halfvec::HalfVector),
199 Numeric {
203 scaled: i128,
204 scale: u8,
205 },
206 Date(i32),
208 Timestamp(i64),
210 Interval {
213 months: i32,
214 micros: i64,
215 },
216 Json(String),
220 Null,
221}
222
223impl Value {
224 pub fn data_type(&self) -> Option<DataType> {
226 match self {
227 Self::SmallInt(_) => Some(DataType::SmallInt),
228 Self::Int(_) => Some(DataType::Int),
229 Self::BigInt(_) => Some(DataType::BigInt),
230 Self::Float(_) => Some(DataType::Float),
231 Self::Text(_) => Some(DataType::Text),
234 Self::Bool(_) => Some(DataType::Bool),
235 Self::Vector(v) => Some(DataType::Vector {
236 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
237 encoding: VecEncoding::F32,
238 }),
239 Self::Sq8Vector(q) => Some(DataType::Vector {
240 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
241 encoding: VecEncoding::Sq8,
242 }),
243 Self::HalfVector(h) => Some(DataType::Vector {
244 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
245 encoding: VecEncoding::F16,
246 }),
247 Self::Numeric { scale, .. } => Some(DataType::Numeric {
252 precision: 0,
253 scale: *scale,
254 }),
255 Self::Date(_) => Some(DataType::Date),
256 Self::Timestamp(_) => Some(DataType::Timestamp),
257 Self::Interval { .. } => Some(DataType::Interval),
258 Self::Json(_) => Some(DataType::Json),
259 Self::Null => None,
260 }
261 }
262
263 pub const fn is_null(&self) -> bool {
264 matches!(self, Self::Null)
265 }
266}
267
268#[derive(Debug, Clone, PartialEq)]
271pub struct Row {
272 pub values: Vec<Value>,
273}
274
275impl Row {
276 pub const fn new(values: Vec<Value>) -> Self {
277 Self { values }
278 }
279
280 pub fn len(&self) -> usize {
281 self.values.len()
282 }
283
284 pub fn is_empty(&self) -> bool {
285 self.values.is_empty()
286 }
287}
288
289#[derive(Debug, Clone, PartialEq)]
290pub struct ColumnSchema {
291 pub name: String,
292 pub ty: DataType,
293 pub nullable: bool,
294 pub default: Option<Value>,
299 pub runtime_default: Option<String>,
307 pub auto_increment: bool,
311}
312
313#[derive(Debug, Clone, PartialEq)]
314pub struct TableSchema {
315 pub name: String,
316 pub columns: Vec<ColumnSchema>,
317 pub hot_tier_bytes: Option<u64>,
323 pub foreign_keys: Vec<ForeignKeyConstraint>,
330 pub uniqueness_constraints: Vec<UniquenessConstraint>,
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct UniquenessConstraint {
345 pub is_primary_key: bool,
350 pub columns: Vec<usize>,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
361pub struct ForeignKeyConstraint {
362 pub name: Option<String>,
366 pub local_columns: Vec<usize>,
369 pub parent_table: String,
371 pub parent_columns: Vec<usize>,
376 pub on_delete: FkAction,
378 pub on_update: FkAction,
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum FkAction {
386 Restrict,
387 Cascade,
388 SetNull,
389 SetDefault,
390 NoAction,
391}
392
393impl FkAction {
394 pub const fn tag(self) -> u8 {
396 match self {
397 Self::Restrict => 0,
398 Self::Cascade => 1,
399 Self::SetNull => 2,
400 Self::SetDefault => 3,
401 Self::NoAction => 4,
402 }
403 }
404 pub const fn from_tag(b: u8) -> Option<Self> {
405 Some(match b {
406 0 => Self::Restrict,
407 1 => Self::Cascade,
408 2 => Self::SetNull,
409 3 => Self::SetDefault,
410 4 => Self::NoAction,
411 _ => return None,
412 })
413 }
414}
415
416impl TableSchema {
417 pub fn column_position(&self, name: &str) -> Option<usize> {
418 self.columns.iter().position(|c| c.name == name)
419 }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
427pub enum IndexKey {
428 Int(i64),
429 Text(String),
430 Bool(bool),
431}
432
433impl IndexKey {
434 pub fn from_value(v: &Value) -> Option<Self> {
435 match v {
436 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
437 Value::Int(n) => Some(Self::Int(i64::from(*n))),
438 Value::BigInt(n) => Some(Self::Int(*n)),
439 Value::Text(s) => Some(Self::Text(s.clone())),
440 Value::Bool(b) => Some(Self::Bool(*b)),
441 Value::Date(d) => Some(Self::Int(i64::from(*d))),
444 Value::Timestamp(t) => Some(Self::Int(*t)),
445 Value::Null
450 | Value::Float(_)
451 | Value::Vector(_)
452 | Value::Sq8Vector(_)
453 | Value::HalfVector(_)
454 | Value::Numeric { .. }
455 | Value::Interval { .. }
456 | Value::Json(_) => None,
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
466pub struct Index {
467 pub name: String,
468 pub column_position: usize,
469 pub kind: IndexKind,
470 pub included_columns: Vec<usize>,
480 pub partial_predicate: Option<String>,
487 pub expression: Option<String>,
492}
493
494pub const NSW_DEFAULT_M: usize = 16;
497
498#[derive(Debug, Clone)]
506pub struct FreezeReport {
507 pub segment_id: u32,
510 pub frozen_rows: usize,
513 pub bytes_freed: u64,
517 pub segment_bytes: Vec<u8>,
522}
523
524#[derive(Debug, Clone)]
533pub struct FreezeSlice {
534 pub row_range: core::ops::Range<usize>,
539 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
545}
546
547#[derive(Debug, Clone)]
563pub struct CompactReport {
564 pub sources: Vec<u32>,
566 pub merged_segment_id: Option<u32>,
568 pub merged_segment_bytes: Vec<u8>,
570 pub merged_rows: usize,
572 pub deleted_rows_pruned: usize,
577 pub bytes_reclaimed_estimate: u64,
581}
582
583#[derive(Debug, Clone)]
584pub enum IndexKind {
585 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
602 Nsw(NswGraph),
604 Brin {
611 column_type: DataType,
615 },
616}
617
618#[derive(Debug, Clone)]
627pub struct NswGraph {
628 pub m: usize,
630 pub m_max_0: usize,
633 pub entry: Option<usize>,
636 pub entry_level: u8,
638 pub levels: PersistentVec<u8>,
645 pub layers: Vec<PersistentVec<Vec<u32>>>,
661}
662
663impl NswGraph {
664 fn new(m: usize) -> Self {
665 Self {
666 m,
667 m_max_0: m.saturating_mul(2),
668 entry: None,
669 entry_level: 0,
670 levels: PersistentVec::new(),
671 layers: alloc::vec![PersistentVec::new()],
672 }
673 }
674
675 pub const fn cap_for_layer(&self, layer: u8) -> usize {
677 if layer == 0 { self.m_max_0 } else { self.m }
678 }
679}
680
681#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
688 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
691 x ^= x >> 30;
692 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
693 x ^= x >> 27;
694 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
695 x ^= x >> 31;
696 let mut level: u8 = 0;
701 while x & 0xF == 0 && level < MAX_LEVEL {
702 level += 1;
703 x >>= 4;
704 }
705 level
706}
707
708impl Index {
709 fn new_btree(name: String, column_position: usize) -> Self {
710 Self {
711 name,
712 column_position,
713 kind: IndexKind::BTree(PersistentBTreeMap::new()),
714 included_columns: Vec::new(),
715 partial_predicate: None,
716 expression: None,
717 }
718 }
719
720 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
721 Self {
722 name,
723 column_position,
724 kind: IndexKind::Nsw(NswGraph::new(m)),
725 included_columns: Vec::new(),
726 partial_predicate: None,
727 expression: None,
728 }
729 }
730
731 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
735 Self {
736 name,
737 column_position,
738 kind: IndexKind::Brin { column_type },
739 included_columns: Vec::new(),
740 partial_predicate: None,
741 expression: None,
742 }
743 }
744
745 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
754 match &self.kind {
755 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
756 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
758 }
759 }
760
761 pub const fn nsw(&self) -> Option<&NswGraph> {
764 match &self.kind {
765 IndexKind::Nsw(g) => Some(g),
766 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
767 }
768 }
769
770 pub const fn is_brin(&self) -> bool {
775 matches!(self.kind, IndexKind::Brin { .. })
776 }
777}
778
779#[derive(Debug, Clone)]
795pub struct Table {
796 schema: TableSchema,
797 rows: PersistentVec<Row>,
798 indices: Vec<Index>,
799 hot_bytes: u64,
800 cold_row_count: u64,
814 cold_row_count_stale: bool,
819}
820
821impl Table {
822 pub fn new(schema: TableSchema) -> Self {
823 Self {
824 schema,
825 rows: PersistentVec::new(),
826 indices: Vec::new(),
827 hot_bytes: 0,
828 cold_row_count: 0,
829 cold_row_count_stale: false,
830 }
831 }
832
833 #[must_use]
837 pub const fn hot_bytes(&self) -> u64 {
838 self.hot_bytes
839 }
840
841 #[must_use]
844 pub const fn cold_row_count(&self) -> u64 {
845 self.cold_row_count
846 }
847
848 pub fn set_cold_row_count(&mut self, n: u64) {
851 self.cold_row_count = n;
852 self.cold_row_count_stale = false;
853 }
854
855 pub fn mark_cold_row_count_stale(&mut self) {
860 self.cold_row_count_stale = true;
861 }
862
863 #[must_use]
867 pub const fn cold_row_count_stale(&self) -> bool {
868 self.cold_row_count_stale
869 }
870
871 #[must_use]
882 pub fn count_cold_locators(&self) -> u64 {
883 let mut best: u64 = 0;
884 for idx in &self.indices {
885 if let IndexKind::BTree(map) = &idx.kind {
886 let n: u64 = map
887 .iter()
888 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
889 .sum();
890 if n > best {
891 best = n;
892 }
893 }
894 }
895 best
896 }
897
898 pub const fn schema(&self) -> &TableSchema {
899 &self.schema
900 }
901
902 pub const fn schema_mut(&mut self) -> &mut TableSchema {
906 &mut self.schema
907 }
908
909 pub const fn rows(&self) -> &PersistentVec<Row> {
913 &self.rows
914 }
915
916 pub const fn row_count(&self) -> usize {
917 self.rows.len()
918 }
919
920 pub fn indices_mut(&mut self) -> &mut [Index] {
925 &mut self.indices
926 }
927
928 pub fn indices(&self) -> &[Index] {
929 &self.indices
930 }
931
932 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
938 let ty = self.schema.columns.get(col_pos)?.ty;
939 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
940 return None;
941 }
942 let mut max: Option<i64> = None;
943 for row in &self.rows {
944 match row.values.get(col_pos) {
945 Some(Value::SmallInt(n)) => {
946 let v = i64::from(*n);
947 max = Some(max.map_or(v, |m| m.max(v)));
948 }
949 Some(Value::Int(n)) => {
950 let v = i64::from(*n);
951 max = Some(max.map_or(v, |m| m.max(v)));
952 }
953 Some(Value::BigInt(n)) => {
954 max = Some(max.map_or(*n, |m| m.max(*n)));
955 }
956 _ => {}
957 }
958 }
959 Some(max.map_or(1, |m| m + 1))
960 }
961
962 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
966 self.indices
973 .iter()
974 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
975 .or_else(|| {
976 self.indices
977 .iter()
978 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
979 })
980 }
981
982 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
986 if row.len() != self.schema.columns.len() {
987 return Err(StorageError::ArityMismatch {
988 expected: self.schema.columns.len(),
989 actual: row.len(),
990 });
991 }
992 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
993 if val.is_null() {
994 if !col.nullable {
995 return Err(StorageError::NullInNotNull {
996 column: col.name.clone(),
997 });
998 }
999 continue;
1000 }
1001 let actual = val.data_type().expect("non-null");
1002 let compatible = actual == col.ty
1016 || matches!(
1017 (actual, col.ty),
1018 (
1019 DataType::Text,
1020 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1021 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1022 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1023 | (DataType::Timestamp, DataType::Timestamptz)
1024 | (DataType::Timestamptz, DataType::Timestamp)
1025 )
1026 || matches!(
1027 (actual, col.ty),
1028 (
1029 DataType::Numeric { scale: a, .. },
1030 DataType::Numeric { scale: b, .. },
1031 ) if a == b
1032 );
1033 if !compatible {
1034 return Err(StorageError::TypeMismatch {
1035 column: col.name.clone(),
1036 expected: col.ty,
1037 actual,
1038 position: i,
1039 });
1040 }
1041 }
1042 let new_row_idx = self.rows.len();
1043 for idx in &mut self.indices {
1047 if let IndexKind::BTree(map) = &mut idx.kind
1048 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1049 {
1050 let mut entries = map.get(&key).cloned().unwrap_or_default();
1056 entries.push(RowLocator::Hot(new_row_idx));
1057 map.insert_mut(key, entries);
1058 }
1059 }
1060 self.hot_bytes = self
1063 .hot_bytes
1064 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1065 self.rows.push_mut(row);
1070 let new_row_idx = self.rows.len() - 1;
1073 let nsw_targets: Vec<usize> = self
1074 .indices
1075 .iter()
1076 .enumerate()
1077 .filter_map(|(i, idx)| {
1078 if matches!(idx.kind, IndexKind::Nsw(_)) {
1079 Some(i)
1080 } else {
1081 None
1082 }
1083 })
1084 .collect();
1085 for idx_pos in nsw_targets {
1086 nsw_insert_at(self, idx_pos, new_row_idx);
1087 }
1088 Ok(())
1089 }
1090
1091 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1095 if self.indices.iter().any(|i| i.name == name) {
1096 return Err(StorageError::DuplicateIndex { name });
1097 }
1098 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1099 StorageError::ColumnNotFound {
1100 column: column_name.into(),
1101 }
1102 })?;
1103 let mut idx = Index::new_btree(name, column_position);
1104 if let IndexKind::BTree(map) = &mut idx.kind {
1105 for (i, row) in self.rows.iter().enumerate() {
1106 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1107 let mut entries = map.get(&key).cloned().unwrap_or_default();
1108 entries.push(RowLocator::Hot(i));
1109 map.insert_mut(key, entries);
1110 }
1111 }
1112 }
1113 self.indices.push(idx);
1114 Ok(())
1115 }
1116
1117 pub fn add_nsw_index(
1122 &mut self,
1123 name: String,
1124 column_name: &str,
1125 m: usize,
1126 ) -> Result<(), StorageError> {
1127 self.add_nsw_index_inner(name, column_name, m, None)
1128 }
1129
1130 pub fn rebuild_nsw_index(
1142 &mut self,
1143 name: &str,
1144 new_encoding: Option<VecEncoding>,
1145 ) -> Result<(), StorageError> {
1146 let idx_pos = self
1147 .indices
1148 .iter()
1149 .position(|i| i.name == name)
1150 .ok_or_else(|| StorageError::IndexNotFound {
1151 name: String::from(name),
1152 })?;
1153 let col_pos = self.indices[idx_pos].column_position;
1154 let m = match &self.indices[idx_pos].kind {
1155 IndexKind::Nsw(g) => g.m,
1156 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1157 return Err(StorageError::Unsupported(format!(
1158 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1159 )));
1160 }
1161 };
1162 let col_name = self.schema.columns[col_pos].name.clone();
1163 if let Some(target) = new_encoding {
1166 let current = match self.schema.columns[col_pos].ty {
1167 DataType::Vector { encoding, .. } => encoding,
1168 ref other => {
1169 return Err(StorageError::Unsupported(format!(
1170 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1171 )));
1172 }
1173 };
1174 if target != current {
1175 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1176 unreachable!("checked above")
1177 };
1178 let n = self.rows.len();
1179 for i in 0..n {
1180 let row = self
1181 .rows
1182 .get_mut(i)
1183 .expect("row index in bounds (we iterated up to len())");
1184 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1185 let recoded = recode_vector_cell(cell, target)?;
1186 row.values[col_pos] = recoded;
1187 }
1188 self.schema.columns[col_pos].ty = DataType::Vector {
1189 dim,
1190 encoding: target,
1191 };
1192 }
1193 }
1194 self.indices.remove(idx_pos);
1196 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1197 Ok(())
1198 }
1199
1200 pub fn restore_nsw_index(
1205 &mut self,
1206 name: String,
1207 column_name: &str,
1208 graph: NswGraph,
1209 ) -> Result<(), StorageError> {
1210 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1211 }
1212
1213 pub fn restore_btree_index(
1220 &mut self,
1221 name: String,
1222 column_name: &str,
1223 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1224 ) -> Result<(), StorageError> {
1225 if self.indices.iter().any(|i| i.name == name) {
1226 return Err(StorageError::DuplicateIndex { name });
1227 }
1228 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1229 StorageError::ColumnNotFound {
1230 column: column_name.into(),
1231 }
1232 })?;
1233 self.indices.push(Index {
1234 name,
1235 column_position,
1236 kind: IndexKind::BTree(map),
1237 included_columns: Vec::new(),
1238 partial_predicate: None,
1239 expression: None,
1240 });
1241 Ok(())
1242 }
1243
1244 pub fn restore_brin_index(
1249 &mut self,
1250 name: String,
1251 column_name: &str,
1252 column_type: DataType,
1253 ) -> Result<(), StorageError> {
1254 if self.indices.iter().any(|i| i.name == name) {
1255 return Err(StorageError::DuplicateIndex { name });
1256 }
1257 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1258 StorageError::ColumnNotFound {
1259 column: column_name.into(),
1260 }
1261 })?;
1262 self.indices.push(Index::new_brin(name, column_position, column_type));
1263 Ok(())
1264 }
1265
1266 pub fn add_brin_index(
1270 &mut self,
1271 name: String,
1272 column_name: &str,
1273 ) -> Result<(), StorageError> {
1274 if self.indices.iter().any(|i| i.name == name) {
1275 return Err(StorageError::DuplicateIndex { name });
1276 }
1277 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1278 StorageError::ColumnNotFound {
1279 column: column_name.into(),
1280 }
1281 })?;
1282 let column_type = self.schema.columns[column_position].ty;
1283 self.indices.push(Index::new_brin(name, column_position, column_type));
1284 Ok(())
1285 }
1286
1287 pub fn register_cold_locators<I>(
1304 &mut self,
1305 index_name: &str,
1306 locators: I,
1307 ) -> Result<usize, StorageError>
1308 where
1309 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1310 {
1311 let idx = self
1312 .indices
1313 .iter_mut()
1314 .find(|i| i.name == index_name)
1315 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1316 let map = match &mut idx.kind {
1317 IndexKind::BTree(map) => map,
1318 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1319 return Err(StorageError::Corrupt(format!(
1320 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1321 )));
1322 }
1323 };
1324 let mut count = 0usize;
1325 for (key, locator) in locators {
1326 let mut entries = map.get(&key).cloned().unwrap_or_default();
1327 entries.push(locator);
1328 map.insert_mut(key, entries);
1329 count += 1;
1330 }
1331 Ok(count)
1332 }
1333
1334 pub fn remove_cold_locators_for_key(
1344 &mut self,
1345 index_name: &str,
1346 key: &IndexKey,
1347 ) -> Result<usize, StorageError> {
1348 let idx = self
1349 .indices
1350 .iter_mut()
1351 .find(|i| i.name == index_name)
1352 .ok_or_else(|| {
1353 StorageError::Corrupt(format!(
1354 "remove_cold_locators_for_key: index {index_name:?} not found"
1355 ))
1356 })?;
1357 let map = match &mut idx.kind {
1358 IndexKind::BTree(map) => map,
1359 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1360 return Err(StorageError::Corrupt(format!(
1361 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1362 cold locators apply only to BTree indices"
1363 )));
1364 }
1365 };
1366 let Some(entries) = map.get(key) else {
1367 return Ok(0);
1368 };
1369 let mut kept: Vec<RowLocator> =
1370 entries.iter().copied().filter(RowLocator::is_hot).collect();
1371 let removed = entries.len() - kept.len();
1372 if removed == 0 {
1373 return Ok(0);
1374 }
1375 kept.shrink_to_fit();
1376 map.insert_mut(key.clone(), kept);
1384 Ok(removed)
1385 }
1386
1387 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1393 if positions.is_empty() {
1394 return 0;
1395 }
1396 let mut to_remove = alloc::vec![false; self.rows.len()];
1400 let mut removed = 0;
1401 for &p in positions {
1402 if p < to_remove.len() && !to_remove[p] {
1403 to_remove[p] = true;
1404 removed += 1;
1405 }
1406 }
1407 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1408 let mut removed_bytes: u64 = 0;
1409 for (i, row) in self.rows.iter().enumerate() {
1410 if to_remove[i] {
1411 removed_bytes =
1412 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1413 } else {
1414 new_rows.push_mut(row.clone());
1415 }
1416 }
1417 self.rows = new_rows;
1418 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1419 self.rebuild_indices();
1420 removed
1421 }
1422
1423 pub fn update_row(
1429 &mut self,
1430 position: usize,
1431 new_values: Vec<Value>,
1432 ) -> Result<(), StorageError> {
1433 if position >= self.rows.len() {
1434 return Err(StorageError::Corrupt(alloc::format!(
1435 "update_row: position {position} out of bounds (rows={})",
1436 self.rows.len()
1437 )));
1438 }
1439 if new_values.len() != self.schema.columns.len() {
1440 return Err(StorageError::ArityMismatch {
1441 expected: self.schema.columns.len(),
1442 actual: new_values.len(),
1443 });
1444 }
1445 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1449 if val.is_null() {
1450 if !col.nullable {
1451 return Err(StorageError::NullInNotNull {
1452 column: col.name.clone(),
1453 });
1454 }
1455 continue;
1456 }
1457 let actual = val.data_type().expect("non-null");
1458 let compatible = actual == col.ty
1459 || matches!(
1460 (actual, col.ty),
1461 (
1462 DataType::Text,
1463 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1464 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1465 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1466 | (DataType::Timestamp, DataType::Timestamptz)
1467 | (DataType::Timestamptz, DataType::Timestamp)
1468 )
1469 || matches!(
1470 (actual, col.ty),
1471 (
1472 DataType::Numeric { scale: a, .. },
1473 DataType::Numeric { scale: b, .. },
1474 ) if a == b
1475 );
1476 if !compatible {
1477 return Err(StorageError::TypeMismatch {
1478 column: col.name.clone(),
1479 expected: col.ty,
1480 actual,
1481 position: i,
1482 });
1483 }
1484 }
1485 let old_row = self
1486 .rows
1487 .get(position)
1488 .expect("position bounds-checked above");
1489 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1490 let new_row = Row::new(new_values);
1491 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1492 self.rows = self
1493 .rows
1494 .set(position, new_row)
1495 .expect("position bounds-checked above");
1496 self.hot_bytes = self
1497 .hot_bytes
1498 .saturating_sub(old_bytes)
1499 .saturating_add(new_bytes);
1500 self.rebuild_indices();
1501 Ok(())
1502 }
1503
1504 fn rebuild_indices(&mut self) {
1511 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1520 .indices
1521 .iter()
1522 .filter_map(|idx| match &idx.kind {
1523 IndexKind::BTree(map) => {
1524 let cold: Vec<(IndexKey, RowLocator)> = map
1525 .iter()
1526 .flat_map(|(k, locs)| {
1527 locs.iter()
1528 .filter(|l| l.is_cold())
1529 .copied()
1530 .map(move |l| (k.clone(), l))
1531 })
1532 .collect();
1533 if cold.is_empty() {
1534 None
1535 } else {
1536 Some((idx.name.clone(), cold))
1537 }
1538 }
1539 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1541 })
1542 .collect();
1543
1544 #[derive(Clone)]
1549 enum RebuildKind {
1550 BTree,
1551 Nsw(usize),
1552 Brin(DataType),
1553 }
1554 let descriptors: Vec<(String, usize, RebuildKind)> = self
1555 .indices
1556 .iter()
1557 .map(|idx| {
1558 let kind = match &idx.kind {
1559 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1560 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1561 IndexKind::BTree(_) => RebuildKind::BTree,
1562 };
1563 (idx.name.clone(), idx.column_position, kind)
1564 })
1565 .collect();
1566 self.indices.clear();
1567 for (name, column_position, rebuild_kind) in descriptors {
1568 match rebuild_kind {
1569 RebuildKind::Nsw(m) => {
1570 let idx = Index::new_nsw(name, column_position, m);
1571 self.indices.push(idx);
1572 let idx_pos = self.indices.len() - 1;
1573 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1574 for row_idx in row_indices {
1575 nsw_insert_at(self, idx_pos, row_idx);
1576 }
1577 }
1578 RebuildKind::Brin(column_type) => {
1579 self.indices.push(Index::new_brin(name, column_position, column_type));
1582 }
1583 RebuildKind::BTree => {
1584 let mut idx = Index::new_btree(name, column_position);
1585 if let IndexKind::BTree(map) = &mut idx.kind {
1586 for (i, row) in self.rows.iter().enumerate() {
1587 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1588 let mut entries = map.get(&key).cloned().unwrap_or_default();
1589 entries.push(RowLocator::Hot(i));
1590 map.insert_mut(key, entries);
1591 }
1592 }
1593 }
1594 self.indices.push(idx);
1595 }
1596 }
1597 }
1598
1599 for (idx_name, locators) in preserved_cold {
1604 let _ = self.register_cold_locators(&idx_name, locators);
1608 }
1609 }
1610
1611 fn add_nsw_index_inner(
1612 &mut self,
1613 name: String,
1614 column_name: &str,
1615 m: usize,
1616 restore: Option<NswGraph>,
1617 ) -> Result<(), StorageError> {
1618 if self.indices.iter().any(|i| i.name == name) {
1619 return Err(StorageError::DuplicateIndex { name });
1620 }
1621 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1622 StorageError::ColumnNotFound {
1623 column: column_name.into(),
1624 }
1625 })?;
1626 if !matches!(
1627 self.schema.columns[column_position].ty,
1628 DataType::Vector { .. }
1629 ) {
1630 return Err(StorageError::TypeMismatch {
1631 column: column_name.into(),
1632 expected: DataType::Vector {
1633 dim: 0,
1634 encoding: VecEncoding::F32,
1635 },
1636 actual: self.schema.columns[column_position].ty,
1637 position: column_position,
1638 });
1639 }
1640 if let Some(graph) = restore {
1641 self.indices.push(Index {
1642 name,
1643 column_position,
1644 kind: IndexKind::Nsw(graph),
1645 included_columns: Vec::new(),
1646 partial_predicate: None,
1647 expression: None,
1648 });
1649 return Ok(());
1650 }
1651 let idx = Index::new_nsw(name, column_position, m);
1652 self.indices.push(idx);
1653 let idx_pos = self.indices.len() - 1;
1654 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1657 for row_idx in row_indices {
1658 nsw_insert_at(self, idx_pos, row_idx);
1659 }
1660 Ok(())
1661 }
1662}
1663
1664fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1671 if matches!(cell, Value::Null) {
1672 return Ok(cell);
1673 }
1674 let as_f32: Vec<f32> = match &cell {
1676 Value::Vector(v) => v.clone(),
1677 Value::Sq8Vector(q) => quantize::dequantize(q),
1678 Value::HalfVector(h) => h.to_f32_vec(),
1679 other => {
1680 return Err(StorageError::Unsupported(format!(
1681 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1682 other.data_type()
1683 )));
1684 }
1685 };
1686 Ok(match target {
1691 VecEncoding::F32 => Value::Vector(as_f32),
1692 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1693 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1694 })
1695}
1696
1697fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1704 let col_pos = table.indices[idx_pos].column_position;
1705 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1706 Value::Vector(v) => Some(v.len()),
1707 Value::Sq8Vector(q) => Some(q.bytes.len()),
1708 Value::HalfVector(h) => Some(h.dim()),
1709 _ => None,
1710 };
1711 let Some(dim) = cell_dim else {
1712 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1715 return;
1716 };
1717 if dim == 0 {
1718 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1719 return;
1720 }
1721 let level = nsw_assign_level(new_row_idx);
1722 ensure_node_slot(table, idx_pos, new_row_idx, level);
1723 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1724 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1725 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1726 unreachable!("nsw_insert_at on a non-NSW index")
1727 }
1728 };
1729 if entry.is_none() {
1731 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1732 g.entry = Some(new_row_idx);
1733 g.entry_level = level;
1734 *g.levels
1735 .get_mut(new_row_idx)
1736 .expect("levels slot padded by ensure_node_slot") = level;
1737 }
1738 return;
1739 }
1740 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1742 *g.levels
1743 .get_mut(new_row_idx)
1744 .expect("levels slot padded by ensure_node_slot") = level;
1745 }
1746 let query = match &table.rows[new_row_idx].values[col_pos] {
1747 Value::Vector(v) => v.clone(),
1748 Value::Sq8Vector(q) => quantize::dequantize(q),
1754 Value::HalfVector(h) => h.to_f32_vec(),
1757 _ => return,
1758 };
1759 let mut current = entry.expect("entry was Some above");
1762 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1763 if entry_level > level {
1764 for layer in (level + 1..=entry_level).rev() {
1765 (current, current_d) =
1766 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1767 }
1768 }
1769 let top = level.min(entry_level);
1773 let ef = (m * 2).max(8);
1774 for layer in (0..=top).rev() {
1775 let cap = if layer == 0 { m * 2 } else { m };
1776 let mut candidates = layer_beam_search(
1777 table,
1778 idx_pos,
1779 layer,
1780 current,
1781 current_d,
1782 &query,
1783 ef,
1784 NswMetric::L2,
1785 );
1786 candidates.retain(|&(_, n)| n != new_row_idx);
1787 if let Some(&(d, n)) = candidates.first() {
1790 current = n;
1791 current_d = d;
1792 }
1793 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1794 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1795 }
1796 if level > entry_level
1799 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1800 {
1801 g.entry = Some(new_row_idx);
1802 g.entry_level = level;
1803 }
1804}
1805
1806fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1810 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1811 unreachable!("ensure_node_slot on a BTree index");
1812 };
1813 while g.layers.len() <= level as usize {
1814 g.layers.push(PersistentVec::new());
1815 }
1816 while g.levels.len() <= new_row_idx {
1817 g.levels.push_mut(0);
1818 }
1819 for layer_vec in &mut g.layers {
1820 while layer_vec.len() <= new_row_idx {
1821 layer_vec.push_mut(Vec::new());
1822 }
1823 }
1824}
1825
1826fn greedy_layer_walk(
1832 table: &Table,
1833 idx_pos: usize,
1834 layer: u8,
1835 mut current: usize,
1836 mut current_d: f32,
1837 query: &[f32],
1838) -> (usize, f32) {
1839 let g = match &table.indices[idx_pos].kind {
1840 IndexKind::Nsw(g) => g,
1841 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1842 };
1843 let col_pos = table.indices[idx_pos].column_position;
1844 loop {
1845 let neighbours: &[u32] = g
1846 .layers
1847 .get(layer as usize)
1848 .and_then(|layer_v| layer_v.get(current))
1849 .map_or(&[][..], Vec::as_slice);
1850 let mut best = current;
1851 let mut best_d = current_d;
1852 for &n in neighbours {
1853 let n = n as usize;
1854 let d = vec_l2_sq(table, col_pos, n, query);
1855 if d < best_d {
1856 best = n;
1857 best_d = d;
1858 }
1859 }
1860 if best == current {
1861 return (current, current_d);
1862 }
1863 current = best;
1864 current_d = best_d;
1865 }
1866}
1867
1868#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1881 table: &Table,
1882 idx_pos: usize,
1883 layer: u8,
1884 entry_node: usize,
1885 entry_d: f32,
1886 query: &[f32],
1887 ef: usize,
1888 metric: NswMetric,
1889) -> Vec<(f32, usize)> {
1890 let g = match &table.indices[idx_pos].kind {
1891 IndexKind::Nsw(g) => g,
1892 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1893 };
1894 let col_pos = table.indices[idx_pos].column_position;
1895 let d0 = if matches!(metric, NswMetric::L2) {
1896 entry_d
1897 } else {
1898 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1899 };
1900 let row_count = table.rows.len();
1901 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1902 if entry_node < row_count {
1903 visited[entry_node] = true;
1904 }
1905 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1908 alloc::collections::BinaryHeap::with_capacity(ef);
1909 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1910 alloc::collections::BinaryHeap::with_capacity(ef);
1911 candidates.push(NodeClosest {
1912 dist: d0,
1913 node: entry_node,
1914 });
1915 results.push(NodeFurthest {
1916 dist: d0,
1917 node: entry_node,
1918 });
1919 while let Some(cur) = candidates.pop() {
1920 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1921 if cur.dist > worst && results.len() >= ef {
1922 break;
1923 }
1924 let neighbours: &[u32] = g
1925 .layers
1926 .get(layer as usize)
1927 .and_then(|layer_v| layer_v.get(cur.node))
1928 .map_or(&[][..], Vec::as_slice);
1929 for &n in neighbours {
1930 let n = n as usize;
1931 if n >= row_count || visited[n] {
1932 continue;
1933 }
1934 visited[n] = true;
1935 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
1939 if !dn.is_finite() {
1940 continue;
1941 }
1942 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1943 if results.len() < ef || dn < worst {
1944 results.push(NodeFurthest { dist: dn, node: n });
1945 if results.len() > ef {
1946 results.pop();
1947 }
1948 candidates.push(NodeClosest { dist: dn, node: n });
1949 }
1950 }
1951 }
1952 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
1955 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
1956 out
1957}
1958
1959#[derive(Debug, Clone, Copy)]
1963struct NodeClosest {
1964 dist: f32,
1965 node: usize,
1966}
1967impl PartialEq for NodeClosest {
1968 fn eq(&self, other: &Self) -> bool {
1969 self.dist == other.dist && self.node == other.node
1970 }
1971}
1972impl Eq for NodeClosest {}
1973impl PartialOrd for NodeClosest {
1974 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
1975 Some(self.cmp(other))
1976 }
1977}
1978impl Ord for NodeClosest {
1979 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
1980 other
1982 .dist
1983 .partial_cmp(&self.dist)
1984 .unwrap_or(core::cmp::Ordering::Equal)
1985 }
1986}
1987
1988#[derive(Debug, Clone, Copy)]
1991struct NodeFurthest {
1992 dist: f32,
1993 node: usize,
1994}
1995impl PartialEq for NodeFurthest {
1996 fn eq(&self, other: &Self) -> bool {
1997 self.dist == other.dist && self.node == other.node
1998 }
1999}
2000impl Eq for NodeFurthest {}
2001impl PartialOrd for NodeFurthest {
2002 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2003 Some(self.cmp(other))
2004 }
2005}
2006impl Ord for NodeFurthest {
2007 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2008 self.dist
2009 .partial_cmp(&other.dist)
2010 .unwrap_or(core::cmp::Ordering::Equal)
2011 }
2012}
2013
2014fn select_neighbours_heuristic(
2023 candidates: &[(f32, usize)],
2024 m: usize,
2025 table: &Table,
2026 col_pos: usize,
2027) -> Vec<usize> {
2028 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2029 for &(d_q, e) in candidates {
2030 if chosen.len() >= m {
2031 break;
2032 }
2033 if !matches!(
2038 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2039 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2040 ) {
2041 continue;
2042 }
2043 let mut covered = false;
2044 for &r in &chosen {
2045 if cell_l2_sq(table, col_pos, e, r) < d_q {
2049 covered = true;
2050 break;
2051 }
2052 }
2053 if !covered {
2054 chosen.push(e);
2055 }
2056 }
2057 chosen
2058}
2059
2060fn connect_at_layer(
2064 table: &mut Table,
2065 idx_pos: usize,
2066 layer: u8,
2067 new_row_idx: usize,
2068 peers: &[usize],
2069) {
2070 let col_pos = table.indices[idx_pos].column_position;
2071 let cap = match &table.indices[idx_pos].kind {
2072 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2073 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2074 };
2075 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2080 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2081 let layer_v = &mut g.layers[layer as usize];
2082 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2083 *slot = peers
2084 .iter()
2085 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2086 .collect();
2087 }
2088 }
2089 for &peer in peers {
2090 if !matches!(
2094 &table.rows[peer].values[col_pos],
2095 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2096 ) {
2097 continue;
2098 }
2099 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2101 let layer_v = &mut g.layers[layer as usize];
2102 if let Some(slot) = layer_v.get_mut(peer)
2103 && !slot.contains(&new_row_u32)
2104 {
2105 slot.push(new_row_u32);
2106 }
2107 }
2108 let needs_trim = match &table.indices[idx_pos].kind {
2112 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2113 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2114 };
2115 if needs_trim {
2116 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2117 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2118 .iter()
2119 .map(|&n| n as usize)
2120 .collect(),
2121 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2122 };
2123 let mut tagged: Vec<(f32, usize)> = current_peers
2128 .iter()
2129 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2130 .collect();
2131 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2132 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2133 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2134 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2135 {
2136 *slot = kept
2137 .into_iter()
2138 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2139 .collect();
2140 }
2141 }
2142 }
2143}
2144
2145fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2152 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2153 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2154 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2155 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2156 }
2157 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2161 halfvec::half_l2_distance_sq_asymmetric(h, query)
2162 }
2163 _ => f32::INFINITY,
2164 }
2165}
2166
2167fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2174 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2175 return f32::INFINITY;
2176 };
2177 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2178 return f32::INFINITY;
2179 };
2180 match (cell_a, cell_b) {
2181 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2182 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2183 quantize::sq8_l2_distance_sq(a, b)
2184 }
2185 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2190 halfvec::half_l2_distance_sq(a, b)
2191 }
2192 _ => f32::INFINITY,
2193 }
2194}
2195
2196fn cell_to_query_metric_distance(
2201 table: &Table,
2202 col_pos: usize,
2203 row: usize,
2204 query: &[f32],
2205 metric: NswMetric,
2206) -> f32 {
2207 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2208 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2209 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2210 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2211 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2212 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2213 },
2214 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2217 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2218 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2219 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2220 },
2221 _ => f32::INFINITY,
2222 }
2223}
2224
2225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2231pub enum NswMetric {
2232 L2,
2235 InnerProduct,
2238 Cosine,
2241}
2242
2243fn nsw_search(
2249 table: &Table,
2250 idx_pos: usize,
2251 query: &[f32],
2252 k: usize,
2253 ef: usize,
2254 metric: NswMetric,
2255) -> Vec<(f32, usize)> {
2256 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2257 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2258 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2259 };
2260 let Some(entry) = entry else {
2261 return Vec::new();
2262 };
2263 let col_pos = table.indices[idx_pos].column_position;
2264 let sq8 = matches!(
2271 table.schema.columns.get(col_pos).map(|c| c.ty),
2272 Some(DataType::Vector {
2273 encoding: VecEncoding::Sq8,
2274 ..
2275 })
2276 );
2277 let ef = if sq8 {
2278 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2279 } else {
2280 ef.max(k)
2281 };
2282 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2284 let mut current = entry;
2285 let mut current_d = entry_d;
2286 for layer in (1..=entry_level).rev() {
2287 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2288 }
2289 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2291 if sq8 {
2292 results = sq8_rerank(table, col_pos, &results, query, metric);
2293 }
2294 results.truncate(k);
2295 results
2296}
2297
2298fn sq8_rerank(
2305 table: &Table,
2306 col_pos: usize,
2307 candidates: &[(f32, usize)],
2308 query: &[f32],
2309 metric: NswMetric,
2310) -> Vec<(f32, usize)> {
2311 let mut out: Vec<(f32, usize)> = candidates
2312 .iter()
2313 .filter_map(|&(adc_d, row)| {
2314 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2315 let Value::Sq8Vector(q) = cell else {
2316 return Some((adc_d, row));
2320 };
2321 let deq = quantize::dequantize(q);
2322 if deq.len() != query.len() {
2323 return None;
2324 }
2325 Some((metric_distance(metric, &deq, query), row))
2326 })
2327 .collect();
2328 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2329 out
2330}
2331
2332const SQ8_RERANK_OVER_FETCH: usize = 3;
2336
2337fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2338 match metric {
2339 NswMetric::L2 => l2_distance_sq(a, b),
2340 NswMetric::InnerProduct => -inner_product_f32(a, b),
2341 NswMetric::Cosine => {
2342 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2343 if na == 0.0 || nb == 0.0 {
2344 return f32::INFINITY;
2345 }
2346 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2349 1.0 - dot / denom
2350 }
2351 }
2352}
2353
2354#[doc(hidden)]
2363#[inline]
2364pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2365 #[cfg(target_arch = "aarch64")]
2366 {
2367 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2368 return unsafe { inner_product_neon(a, b) };
2371 }
2372 }
2373 inner_product_scalar(a, b)
2374}
2375
2376fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2377 let mut dot: f32 = 0.0;
2378 for (x, y) in a.iter().zip(b.iter()) {
2379 dot += x * y;
2380 }
2381 dot
2382}
2383
2384#[cfg(target_arch = "aarch64")]
2385#[target_feature(enable = "neon")]
2386#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2388 use core::arch::aarch64::{
2389 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2390 };
2391 unsafe {
2392 let zero: float32x4_t = vdupq_n_f32(0.0);
2395 let mut acc0 = zero;
2396 let mut acc1 = zero;
2397 let n = a.len();
2398 let mut i = 0usize;
2399 while i + 8 <= n {
2400 let av0 = vld1q_f32(a.as_ptr().add(i));
2401 let bv0 = vld1q_f32(b.as_ptr().add(i));
2402 acc0 = vfmaq_f32(acc0, av0, bv0);
2403 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2404 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2405 acc1 = vfmaq_f32(acc1, av1, bv1);
2406 i += 8;
2407 }
2408 while i + 4 <= n {
2409 let av = vld1q_f32(a.as_ptr().add(i));
2410 let bv = vld1q_f32(b.as_ptr().add(i));
2411 acc0 = vfmaq_f32(acc0, av, bv);
2412 i += 4;
2413 }
2414 vaddvq_f32(vaddq_f32(acc0, acc1))
2415 }
2416}
2417
2418#[doc(hidden)]
2425#[inline]
2426pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2427 #[cfg(target_arch = "aarch64")]
2428 {
2429 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2430 return unsafe { cosine_dot_norms_neon(a, b) };
2432 }
2433 }
2434 cosine_dot_norms_scalar(a, b)
2435}
2436
2437fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2438 let mut dot: f32 = 0.0;
2439 let mut na: f32 = 0.0;
2440 let mut nb: f32 = 0.0;
2441 for (x, y) in a.iter().zip(b.iter()) {
2442 dot += x * y;
2443 na += x * x;
2444 nb += y * y;
2445 }
2446 (dot, na, nb)
2447}
2448
2449#[cfg(target_arch = "aarch64")]
2450#[target_feature(enable = "neon")]
2451#[allow(clippy::many_single_char_names, clippy::similar_names)]
2452unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2453 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2454 unsafe {
2455 let zero: float32x4_t = vdupq_n_f32(0.0);
2456 let mut acc_dot = zero;
2457 let mut acc_na = zero;
2458 let mut acc_nb = zero;
2459 let n = a.len();
2460 let mut i = 0usize;
2461 while i + 4 <= n {
2462 let av = vld1q_f32(a.as_ptr().add(i));
2463 let bv = vld1q_f32(b.as_ptr().add(i));
2464 acc_dot = vfmaq_f32(acc_dot, av, bv);
2465 acc_na = vfmaq_f32(acc_na, av, av);
2466 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2467 i += 4;
2468 }
2469 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2470 }
2471}
2472
2473fn sqrt_newton_f32(x: f32) -> f32 {
2474 if x <= 0.0 {
2475 return 0.0;
2476 }
2477 let mut g = x;
2478 for _ in 0..10 {
2479 g = 0.5 * (g + x / g);
2480 }
2481 g
2482}
2483
2484#[inline]
2492fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2493 #[cfg(target_arch = "aarch64")]
2494 {
2495 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2496 return unsafe { l2_distance_sq_neon(a, b) };
2500 }
2501 }
2502 l2_distance_sq_scalar(a, b)
2503}
2504
2505fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2506 let mut sum: f32 = 0.0;
2507 for (x, y) in a.iter().zip(b.iter()) {
2508 let d = *x - *y;
2509 sum += d * d;
2510 }
2511 sum
2512}
2513
2514#[cfg(target_arch = "aarch64")]
2515#[target_feature(enable = "neon")]
2516#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2518 use core::arch::aarch64::{
2519 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2520 };
2521 unsafe {
2522 let zero: float32x4_t = vdupq_n_f32(0.0);
2527 let mut acc0 = zero;
2528 let mut acc1 = zero;
2529 let n = a.len();
2530 let mut i = 0usize;
2531 while i + 8 <= n {
2534 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2535 acc0 = vfmaq_f32(acc0, d0, d0);
2536 let d1 = vsubq_f32(
2537 vld1q_f32(a.as_ptr().add(i + 4)),
2538 vld1q_f32(b.as_ptr().add(i + 4)),
2539 );
2540 acc1 = vfmaq_f32(acc1, d1, d1);
2541 i += 8;
2542 }
2543 while i + 4 <= n {
2544 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2545 acc0 = vfmaq_f32(acc0, d, d);
2546 i += 4;
2547 }
2548 vaddvq_f32(vaddq_f32(acc0, acc1))
2549 }
2550}
2551
2552pub fn nsw_query(
2555 table: &Table,
2556 idx_name: &str,
2557 query: &[f32],
2558 k: usize,
2559 metric: NswMetric,
2560) -> Vec<usize> {
2561 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2562 return Vec::new();
2563 };
2564 let ef = (k * 2).max(NSW_DEFAULT_M);
2565 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2566 hits.truncate(k);
2567 hits.into_iter().map(|(_, idx)| idx).collect()
2568}
2569
2570pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2574 table
2575 .indices
2576 .iter()
2577 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2578}
2579
2580#[derive(Debug, Clone, Default)]
2592pub struct Catalog {
2593 tables: Vec<Table>,
2594 by_name: BTreeMap<String, usize>,
2597 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2619}
2620
2621impl Catalog {
2622 pub const fn new() -> Self {
2623 Self {
2624 tables: Vec::new(),
2625 by_name: BTreeMap::new(),
2626 cold_segments: Vec::new(),
2627 }
2628 }
2629
2630 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2631 if self.by_name.contains_key(&schema.name) {
2632 return Err(StorageError::DuplicateTable {
2633 name: schema.name.clone(),
2634 });
2635 }
2636 let idx = self.tables.len();
2637 let name = schema.name.clone();
2638 self.tables.push(Table::new(schema));
2639 self.by_name.insert(name, idx);
2640 Ok(())
2641 }
2642
2643 pub fn get(&self, name: &str) -> Option<&Table> {
2644 let idx = *self.by_name.get(name)?;
2645 self.tables.get(idx)
2646 }
2647
2648 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2649 let idx = *self.by_name.get(name)?;
2650 self.tables.get_mut(idx)
2651 }
2652
2653 pub fn table_count(&self) -> usize {
2654 self.tables.len()
2655 }
2656
2657 pub fn table_names(&self) -> Vec<String> {
2660 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2661 }
2662
2663 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2674 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2675 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2676 })?;
2677 let seg = OwnedSegment::from_bytes(bytes)
2678 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2679 self.cold_segments.push(Some(Arc::new(seg)));
2680 Ok(id)
2681 }
2682
2683 pub fn load_segment_bytes_at(
2696 &mut self,
2697 target_id: u32,
2698 bytes: Vec<u8>,
2699 ) -> Result<(), StorageError> {
2700 let seg = OwnedSegment::from_bytes(bytes)
2701 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2702 let idx = target_id as usize;
2703 while self.cold_segments.len() <= idx {
2704 self.cold_segments.push(None);
2705 }
2706 if self.cold_segments[idx].is_some() {
2707 return Err(StorageError::Corrupt(format!(
2708 "load_segment_bytes_at: segment_id {target_id} already occupied"
2709 )));
2710 }
2711 self.cold_segments[idx] = Some(Arc::new(seg));
2712 Ok(())
2713 }
2714
2715 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2725 let idx = segment_id as usize;
2726 if idx >= self.cold_segments.len() {
2727 return Err(StorageError::Corrupt(format!(
2728 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2729 self.cold_segments.len()
2730 )));
2731 }
2732 self.cold_segments[idx] = None;
2733 Ok(())
2734 }
2735
2736 #[must_use]
2738 pub fn cold_segment_count(&self) -> usize {
2739 self.cold_segments.iter().filter(|s| s.is_some()).count()
2740 }
2741
2742 #[must_use]
2745 pub fn cold_segment_slot_count(&self) -> usize {
2746 self.cold_segments.len()
2747 }
2748
2749 #[must_use]
2754 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2755 self.cold_segments
2756 .iter()
2757 .enumerate()
2758 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2759 .collect()
2760 }
2761
2762 #[must_use]
2769 pub fn hot_tier_bytes(&self) -> u64 {
2770 self.tables
2771 .iter()
2772 .map(Table::hot_bytes)
2773 .fold(0u64, u64::saturating_add)
2774 }
2775
2776 pub fn freeze_oldest_to_cold(
2821 &mut self,
2822 table_name: &str,
2823 index_name: &str,
2824 max_rows: usize,
2825 ) -> Result<FreezeReport, StorageError> {
2826 if max_rows == 0 {
2828 return Err(StorageError::Corrupt(
2829 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2830 ));
2831 }
2832 let table = self.get(table_name).ok_or_else(|| {
2833 StorageError::Corrupt(format!(
2834 "freeze_oldest_to_cold: table {table_name:?} not found"
2835 ))
2836 })?;
2837 if max_rows > table.rows.len() {
2838 return Err(StorageError::Corrupt(format!(
2839 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2840 table.rows.len()
2841 )));
2842 }
2843 let idx = table
2844 .indices
2845 .iter()
2846 .find(|i| i.name == index_name)
2847 .ok_or_else(|| {
2848 StorageError::Corrupt(format!(
2849 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2850 ))
2851 })?;
2852 if !matches!(idx.kind, IndexKind::BTree(_)) {
2853 return Err(StorageError::Corrupt(format!(
2854 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2855 )));
2856 }
2857 let column_position = idx.column_position;
2858
2859 let schema = table.schema.clone();
2861 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2862 for row_idx in 0..max_rows {
2863 let row = table.rows.get(row_idx).expect("bounds-checked above");
2864 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2865 StorageError::Corrupt(format!(
2866 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2867 ))
2868 })?;
2869 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2870 StorageError::Corrupt(format!(
2871 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2872 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2873 ))
2874 })?;
2875 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2876 }
2877 to_freeze.sort_by_key(|(k, _, _)| *k);
2882 for w in to_freeze.windows(2) {
2886 if w[0].0 == w[1].0 {
2887 return Err(StorageError::Corrupt(format!(
2888 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2889 w[0].0
2890 )));
2891 }
2892 }
2893 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2897 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2901 .into_iter()
2902 .map(|(k, body, _)| (k, body))
2903 .collect();
2904 let frozen_rows = seg_rows.len();
2905 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2906 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2907
2908 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2917 let positions: Vec<usize> = (0..max_rows).collect();
2918 let t_mut = self
2919 .get_mut(table_name)
2920 .expect("just validated; still present");
2921 let removed = t_mut.delete_rows(&positions);
2922 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2923 let bytes_after = t_mut.hot_bytes();
2924 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2925
2926 let segment_id = self
2927 .load_segment_bytes(seg_bytes.clone())
2928 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2929 let new_cold = post_swap_keys.into_iter().map(|k| {
2930 (
2931 k,
2932 RowLocator::Cold {
2933 segment_id,
2934 page_offset: 0,
2935 },
2936 )
2937 });
2938 let t_mut = self.get_mut(table_name).expect("still present");
2939 t_mut.register_cold_locators(index_name, new_cold)?;
2940
2941 Ok(FreezeReport {
2942 segment_id,
2943 frozen_rows,
2944 bytes_freed,
2945 segment_bytes: seg_bytes,
2946 })
2947 }
2948
2949 #[must_use]
2955 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
2956 self.cold_segments
2957 .get(segment_id as usize)
2958 .and_then(|s| s.as_deref())
2959 }
2960
2961 pub fn resolve_cold_locator(
2970 &self,
2971 table_name: &str,
2972 segment_id: u32,
2973 key: &IndexKey,
2974 ) -> Option<Row> {
2975 let t = self.get(table_name)?;
2976 let u64_key = index_key_as_u64(key)?;
2977 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
2978 let payload = seg.lookup(u64_key)?;
2979 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
2980 Some(row)
2981 }
2982
2983 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3001 let t = self.get(table)?;
3002 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3003 let locators = idx.lookup_eq(key);
3004 let cold_u64_key = index_key_as_u64(key);
3005 for loc in locators {
3006 match *loc {
3007 RowLocator::Hot(i) => {
3008 if let Some(row) = t.rows.get(i) {
3009 return Some(row.clone());
3010 }
3011 }
3012 RowLocator::Cold {
3013 segment_id,
3014 page_offset: _,
3015 } => {
3016 let Some(u64_key) = cold_u64_key else {
3017 continue;
3020 };
3021 let Some(seg) = self
3022 .cold_segments
3023 .get(segment_id as usize)
3024 .and_then(|s| s.as_deref())
3025 else {
3026 continue;
3037 };
3038 let Some(payload) = seg.lookup(u64_key) else {
3039 continue;
3040 };
3041 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3042 return Some(row);
3043 }
3044 }
3045 }
3046 None
3047 }
3048
3049 pub fn promote_cold_row(
3071 &mut self,
3072 table_name: &str,
3073 index_name: &str,
3074 key: &IndexKey,
3075 ) -> Result<Option<usize>, StorageError> {
3076 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3077 let Some((segment_id, _page_offset)) = cold_loc else {
3078 return Ok(None);
3079 };
3080 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3081 StorageError::Corrupt(
3082 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3083 .into(),
3084 )
3085 })?;
3086 let schema = self
3090 .get(table_name)
3091 .ok_or_else(|| {
3092 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3093 })?
3094 .schema
3095 .clone();
3096 let seg = self
3097 .cold_segments
3098 .get(segment_id as usize)
3099 .and_then(|s| s.as_ref())
3100 .ok_or_else(|| {
3101 StorageError::Corrupt(format!(
3102 "promote_cold_row: segment {segment_id} not registered on catalog"
3103 ))
3104 })?;
3105 let payload = seg.lookup(u64_key).ok_or_else(|| {
3106 StorageError::Corrupt(format!(
3107 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3108 but the segment's bloom/page lookup didn't return a row"
3109 ))
3110 })?;
3111 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3112 let t = self
3117 .get_mut(table_name)
3118 .expect("table existed at lookup time");
3119 t.insert(row)?;
3120 let new_hot_idx =
3121 t.rows.len().checked_sub(1).ok_or_else(|| {
3122 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3123 })?;
3124 t.remove_cold_locators_for_key(index_name, key)?;
3128 Ok(Some(new_hot_idx))
3129 }
3130
3131 pub fn shadow_cold_row(
3149 &mut self,
3150 table_name: &str,
3151 index_name: &str,
3152 key: &IndexKey,
3153 ) -> Result<usize, StorageError> {
3154 let t = self.get_mut(table_name).ok_or_else(|| {
3155 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3156 })?;
3157 t.remove_cold_locators_for_key(index_name, key)
3158 }
3159
3160 pub fn prepare_freeze_slice(
3178 &self,
3179 table_name: &str,
3180 index_name: &str,
3181 row_range: core::ops::Range<usize>,
3182 ) -> Result<FreezeSlice, StorageError> {
3183 let table = self.get(table_name).ok_or_else(|| {
3184 StorageError::Corrupt(format!(
3185 "prepare_freeze_slice: table {table_name:?} not found"
3186 ))
3187 })?;
3188 let idx = table
3189 .indices
3190 .iter()
3191 .find(|i| i.name == index_name)
3192 .ok_or_else(|| {
3193 StorageError::Corrupt(format!(
3194 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3195 ))
3196 })?;
3197 if !matches!(idx.kind, IndexKind::BTree(_)) {
3198 return Err(StorageError::Corrupt(format!(
3199 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3200 )));
3201 }
3202 if row_range.end > table.rows.len() {
3203 return Err(StorageError::Corrupt(format!(
3204 "prepare_freeze_slice: row_range end {} > row_count {}",
3205 row_range.end,
3206 table.rows.len()
3207 )));
3208 }
3209 let column_position = idx.column_position;
3210 let schema = table.schema.clone();
3211 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3212 for row_idx in row_range.clone() {
3213 let row = table.rows.get(row_idx).expect("bounds-checked above");
3214 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3215 StorageError::Corrupt(format!(
3216 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3217 ))
3218 })?;
3219 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3220 StorageError::Corrupt(format!(
3221 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3222 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3223 ))
3224 })?;
3225 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3226 }
3227 rows.sort_by_key(|(k, _, _)| *k);
3228 Ok(FreezeSlice { row_range, rows })
3229 }
3230
3231 pub fn commit_freeze_slices(
3245 &mut self,
3246 table_name: &str,
3247 index_name: &str,
3248 slices: Vec<FreezeSlice>,
3249 ) -> Result<FreezeReport, StorageError> {
3250 let table = self.get(table_name).ok_or_else(|| {
3252 StorageError::Corrupt(format!(
3253 "commit_freeze_slices: table {table_name:?} not found"
3254 ))
3255 })?;
3256 let idx = table
3257 .indices
3258 .iter()
3259 .find(|i| i.name == index_name)
3260 .ok_or_else(|| {
3261 StorageError::Corrupt(format!(
3262 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3263 ))
3264 })?;
3265 if !matches!(idx.kind, IndexKind::BTree(_)) {
3266 return Err(StorageError::Corrupt(format!(
3267 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3268 )));
3269 }
3270 let mut ordered = slices;
3274 ordered.sort_by_key(|s| s.row_range.start);
3275 let mut expected_start = 0usize;
3279 for s in &ordered {
3280 if s.row_range.start != expected_start {
3281 return Err(StorageError::Corrupt(format!(
3282 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3283 s.row_range.start, expected_start
3284 )));
3285 }
3286 expected_start = s.row_range.end;
3287 }
3288 let max_rows = expected_start;
3289 if max_rows > table.rows.len() {
3290 return Err(StorageError::Corrupt(format!(
3291 "commit_freeze_slices: total row range {} exceeds row_count {}",
3292 max_rows,
3293 table.rows.len()
3294 )));
3295 }
3296 if max_rows == 0 {
3297 return Ok(FreezeReport {
3298 segment_id: u32::MAX,
3299 frozen_rows: 0,
3300 bytes_freed: 0,
3301 segment_bytes: Vec::new(),
3302 });
3303 }
3304
3305 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3310 if total_rows != max_rows {
3311 return Err(StorageError::Corrupt(format!(
3312 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3313 )));
3314 }
3315 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3316 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3317 loop {
3318 let mut pick: Option<usize> = None;
3321 for (i, c) in cursors.iter().enumerate() {
3322 let slice = &ordered[i];
3323 if *c >= slice.rows.len() {
3324 continue;
3325 }
3326 match pick {
3327 None => pick = Some(i),
3328 Some(j) => {
3329 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3330 pick = Some(i);
3331 }
3332 }
3333 }
3334 }
3335 let Some(i) = pick else { break };
3336 let row = ordered[i].rows[cursors[i]].clone();
3337 cursors[i] += 1;
3338 merged.push(row);
3339 }
3340 for w in merged.windows(2) {
3343 if w[0].0 == w[1].0 {
3344 return Err(StorageError::Corrupt(format!(
3345 "commit_freeze_slices: duplicate PK {} across slices",
3346 w[0].0
3347 )));
3348 }
3349 }
3350 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3351 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3352 .into_iter()
3353 .map(|(k, body, _)| (k, body))
3354 .collect();
3355 let frozen_rows = seg_rows.len();
3356 let (seg_bytes, _meta) =
3357 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3358 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3359 })?;
3360
3361 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3363 let positions: Vec<usize> = (0..max_rows).collect();
3364 let t_mut = self
3365 .get_mut(table_name)
3366 .expect("just validated; still present");
3367 let removed = t_mut.delete_rows(&positions);
3368 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3369 let bytes_after = t_mut.hot_bytes();
3370 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3371
3372 let segment_id = self
3373 .load_segment_bytes(seg_bytes.clone())
3374 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3375 let new_cold = post_swap_keys.into_iter().map(|k| {
3376 (
3377 k,
3378 RowLocator::Cold {
3379 segment_id,
3380 page_offset: 0,
3381 },
3382 )
3383 });
3384 let t_mut = self.get_mut(table_name).expect("still present");
3385 t_mut.register_cold_locators(index_name, new_cold)?;
3386
3387 Ok(FreezeReport {
3388 segment_id,
3389 frozen_rows,
3390 bytes_freed,
3391 segment_bytes: seg_bytes,
3392 })
3393 }
3394
3395 pub fn compact_cold_segments(
3438 &mut self,
3439 table_name: &str,
3440 index_name: &str,
3441 target_segment_bytes: u64,
3442 ) -> Result<CompactReport, StorageError> {
3443 let t = self.get(table_name).ok_or_else(|| {
3445 StorageError::Corrupt(format!(
3446 "compact_cold_segments: table {table_name:?} not found"
3447 ))
3448 })?;
3449 let idx = t
3450 .indices
3451 .iter()
3452 .find(|i| i.name == index_name)
3453 .ok_or_else(|| {
3454 StorageError::Corrupt(format!(
3455 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3456 ))
3457 })?;
3458 let map = match &idx.kind {
3459 IndexKind::BTree(m) => m,
3460 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3461 return Err(StorageError::Corrupt(format!(
3462 "compact_cold_segments: index {index_name:?} is not BTree; \
3463 compaction applies only to BTree cold-tier indices"
3464 )));
3465 }
3466 };
3467
3468 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3471 for (_key, locators) in map.iter() {
3472 for loc in locators {
3473 if let RowLocator::Cold { segment_id, .. } = loc {
3474 referenced_ids.insert(*segment_id);
3475 }
3476 }
3477 }
3478 let candidate_set: BTreeSet<u32> = referenced_ids
3480 .into_iter()
3481 .filter(|id| {
3482 self.cold_segments
3483 .get(*id as usize)
3484 .and_then(|s| s.as_deref())
3485 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3486 })
3487 .collect();
3488 if candidate_set.len() < 2 {
3489 return Ok(CompactReport {
3490 sources: Vec::new(),
3491 merged_segment_id: None,
3492 merged_segment_bytes: Vec::new(),
3493 merged_rows: 0,
3494 deleted_rows_pruned: 0,
3495 bytes_reclaimed_estimate: 0,
3496 });
3497 }
3498 let mut source_row_count: usize = 0;
3500 let mut source_byte_total: u64 = 0;
3501 for &id in &candidate_set {
3502 let seg = self.cold_segments[id as usize]
3503 .as_ref()
3504 .expect("candidate selected only when slot is Some");
3505 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3506 source_byte_total =
3507 source_byte_total.saturating_add(seg.bytes().len() as u64);
3508 }
3509 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3515 for (key, locators) in map.iter() {
3516 for loc in locators {
3517 let RowLocator::Cold { segment_id, .. } = loc else {
3518 continue;
3519 };
3520 if !candidate_set.contains(segment_id) {
3521 continue;
3522 }
3523 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3524 StorageError::Corrupt(format!(
3525 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3526 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3527 ))
3528 })?;
3529 let seg = self.cold_segments[*segment_id as usize]
3530 .as_ref()
3531 .expect("candidate slot guaranteed Some above");
3532 let payload = seg.lookup(u64_key).ok_or_else(|| {
3533 StorageError::Corrupt(format!(
3534 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3535 at segment {segment_id} but the segment lookup missed"
3536 ))
3537 })?;
3538 collected.insert(u64_key, (payload, key.clone()));
3539 break;
3540 }
3541 }
3542 let merged_rows = collected.len();
3543 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3544
3545 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3549 .iter()
3550 .map(|(k, (body, _))| (*k, body.clone()))
3551 .collect();
3552 let (seg_bytes, _meta) =
3553 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3554 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3555 })?;
3556 let merged_bytes_len = seg_bytes.len() as u64;
3557
3558 let merged_segment_id = self
3560 .load_segment_bytes(seg_bytes.clone())
3561 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3562
3563 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3569 let t = self
3570 .get(table_name)
3571 .expect("table existed at the start of this fn");
3572 let idx = t
3573 .indices
3574 .iter()
3575 .find(|i| i.name == index_name)
3576 .expect("index existed at the start of this fn");
3577 let IndexKind::BTree(map) = &idx.kind else {
3578 unreachable!("validated above");
3579 };
3580 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3581 };
3582 let t_mut = self
3583 .get_mut(table_name)
3584 .expect("table existed at the start of this fn");
3585 let idx_mut = t_mut
3586 .indices
3587 .iter_mut()
3588 .find(|i| i.name == index_name)
3589 .expect("index existed at the start of this fn");
3590 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3591 unreachable!("validated above");
3592 };
3593 for (key, locators) in entries {
3594 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3595 let mut changed = false;
3596 for loc in &locators {
3597 match *loc {
3598 RowLocator::Cold {
3599 segment_id,
3600 page_offset: _,
3601 } if candidate_set.contains(&segment_id) => {
3602 let replacement = RowLocator::Cold {
3603 segment_id: merged_segment_id,
3604 page_offset: 0,
3605 };
3606 if !new_locs.contains(&replacement) {
3607 new_locs.push(replacement);
3608 }
3609 changed = true;
3610 }
3611 other => new_locs.push(other),
3612 }
3613 }
3614 if changed {
3615 map_mut.insert_mut(key, new_locs);
3616 }
3617 }
3618
3619 for &id in &candidate_set {
3624 self.tombstone_segment(id)?;
3625 }
3626
3627 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3628 Ok(CompactReport {
3629 sources: candidate_set.into_iter().collect(),
3630 merged_segment_id: Some(merged_segment_id),
3631 merged_segment_bytes: seg_bytes,
3632 merged_rows,
3633 deleted_rows_pruned,
3634 bytes_reclaimed_estimate,
3635 })
3636 }
3637
3638 fn find_cold_locator(
3644 &self,
3645 table_name: &str,
3646 index_name: &str,
3647 key: &IndexKey,
3648 ) -> Result<Option<(u32, u32)>, StorageError> {
3649 let t = self.get(table_name).ok_or_else(|| {
3650 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3651 })?;
3652 let idx = t
3653 .indices
3654 .iter()
3655 .find(|i| i.name == index_name)
3656 .ok_or_else(|| {
3657 StorageError::Corrupt(format!(
3658 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3659 ))
3660 })?;
3661 if !matches!(idx.kind, IndexKind::BTree(_)) {
3662 return Err(StorageError::Corrupt(format!(
3663 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3664 )));
3665 }
3666 for loc in idx.lookup_eq(key) {
3667 if let RowLocator::Cold {
3668 segment_id,
3669 page_offset,
3670 } = *loc
3671 {
3672 return Ok(Some((segment_id, page_offset)));
3673 }
3674 }
3675 Ok(None)
3676 }
3677}
3678
3679fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3685 match key {
3686 IndexKey::Int(n) => Some(n.cast_unsigned()),
3692 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3693 }
3694}
3695
3696#[derive(Debug, Clone, PartialEq, Eq)]
3697#[non_exhaustive]
3698pub enum StorageError {
3699 DuplicateTable {
3700 name: String,
3701 },
3702 TableNotFound {
3703 name: String,
3704 },
3705 ArityMismatch {
3706 expected: usize,
3707 actual: usize,
3708 },
3709 TypeMismatch {
3710 column: String,
3711 expected: DataType,
3712 actual: DataType,
3713 position: usize,
3714 },
3715 NullInNotNull {
3716 column: String,
3717 },
3718 DuplicateIndex {
3720 name: String,
3721 },
3722 ColumnNotFound {
3724 column: String,
3725 },
3726 Corrupt(String),
3729 IndexNotFound {
3732 name: String,
3733 },
3734 Unsupported(String),
3738}
3739
3740impl fmt::Display for StorageError {
3741 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3742 match self {
3743 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3744 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3745 Self::ArityMismatch { expected, actual } => write!(
3746 f,
3747 "row arity mismatch: expected {expected} columns, got {actual}"
3748 ),
3749 Self::TypeMismatch {
3750 column,
3751 expected,
3752 actual,
3753 position,
3754 } => write!(
3755 f,
3756 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3757 ),
3758 Self::NullInNotNull { column } => {
3759 write!(f, "NULL value in NOT NULL column {column:?}")
3760 }
3761 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3762 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3763 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3764 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3765 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3766 }
3767 }
3768}
3769
3770impl ColumnSchema {
3771 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3772 Self {
3773 name: name.into(),
3774 ty,
3775 nullable,
3776 default: None,
3777 runtime_default: None,
3778 auto_increment: false,
3779 }
3780 }
3781
3782 #[must_use]
3786 pub fn with_default(mut self, default: Value) -> Self {
3787 self.default = Some(default);
3788 self
3789 }
3790
3791 #[must_use]
3796 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3797 self.runtime_default = Some(expr.into());
3798 self
3799 }
3800
3801 #[must_use]
3803 pub const fn with_auto_increment(mut self) -> Self {
3804 self.auto_increment = true;
3805 self
3806 }
3807}
3808
3809impl TableSchema {
3810 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3811 Self {
3812 name: name.into(),
3813 columns,
3814 hot_tier_bytes: None,
3815 foreign_keys: Vec::new(),
3816 uniqueness_constraints: Vec::new(),
3817 }
3818 }
3819}
3820
3821const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3869const FILE_VERSION: u8 = 15;
3895const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3898
3899const INDEX_KEY_TAG_INT: u8 = 0;
3904const INDEX_KEY_TAG_TEXT: u8 = 1;
3905const INDEX_KEY_TAG_BOOL: u8 = 2;
3906
3907impl Catalog {
3908 pub fn serialize(&self) -> Vec<u8> {
3911 let mut out = Vec::with_capacity(64);
3912 out.extend_from_slice(FILE_MAGIC);
3913 out.push(FILE_VERSION);
3914 write_u32(
3915 &mut out,
3916 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3917 );
3918 for t in &self.tables {
3919 write_str(&mut out, &t.schema.name);
3920 write_u16(
3921 &mut out,
3922 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3923 );
3924 for c in &t.schema.columns {
3925 write_str(&mut out, &c.name);
3926 write_data_type(&mut out, c.ty);
3927 out.push(u8::from(c.nullable));
3928 match &c.default {
3929 None => out.push(0),
3930 Some(v) => {
3931 out.push(1);
3932 write_value(&mut out, v);
3933 }
3934 }
3935 out.push(u8::from(c.auto_increment));
3936 }
3937 write_u32(
3938 &mut out,
3939 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
3940 );
3941 for row in &t.rows {
3946 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
3947 }
3948 write_u16(
3955 &mut out,
3956 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
3957 );
3958 for idx in &t.indices {
3959 write_str(&mut out, &idx.name);
3960 write_u16(
3961 &mut out,
3962 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
3963 );
3964 match &idx.kind {
3965 IndexKind::BTree(map) => {
3966 out.push(0);
3967 write_u32(
3975 &mut out,
3976 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
3977 );
3978 for (key, locators) in map {
3979 write_index_key(&mut out, key);
3980 write_u32(
3981 &mut out,
3982 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
3983 );
3984 for loc in locators {
3985 loc.write_le(&mut out);
3986 }
3987 }
3988 }
3989 IndexKind::Nsw(g) => {
3990 out.push(1);
3991 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
3992 write_nsw_graph(&mut out, g);
3993 }
3994 IndexKind::Brin { column_type } => {
3995 out.push(2);
4001 write_data_type(&mut out, *column_type);
4002 }
4003 }
4004 write_u16(
4010 &mut out,
4011 u16::try_from(idx.included_columns.len())
4012 .expect("≤ 65k INCLUDE columns/index"),
4013 );
4014 for col_pos in &idx.included_columns {
4015 write_u16(
4016 &mut out,
4017 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4018 );
4019 }
4020 match &idx.partial_predicate {
4024 None => out.push(0),
4025 Some(pred) => {
4026 out.push(1);
4027 write_str(&mut out, pred);
4028 }
4029 }
4030 match &idx.expression {
4033 None => out.push(0),
4034 Some(expr) => {
4035 out.push(1);
4036 write_str(&mut out, expr);
4037 }
4038 }
4039 }
4040 match t.schema.hot_tier_bytes {
4046 None => out.push(0),
4047 Some(n) => {
4048 out.push(1);
4049 out.extend_from_slice(&n.to_le_bytes());
4050 }
4051 }
4052 write_u16(
4063 &mut out,
4064 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4065 );
4066 for fk in &t.schema.foreign_keys {
4067 match &fk.name {
4068 None => out.push(0),
4069 Some(n) => {
4070 out.push(1);
4071 write_str(&mut out, n);
4072 }
4073 }
4074 write_u16(
4075 &mut out,
4076 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4077 );
4078 for &p in &fk.local_columns {
4079 write_u16(
4080 &mut out,
4081 u16::try_from(p).expect("≤ 65k columns/table"),
4082 );
4083 }
4084 write_str(&mut out, &fk.parent_table);
4085 write_u16(
4086 &mut out,
4087 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4088 );
4089 for &p in &fk.parent_columns {
4090 write_u16(
4091 &mut out,
4092 u16::try_from(p).expect("≤ 65k columns/table"),
4093 );
4094 }
4095 out.push(fk.on_delete.tag());
4096 out.push(fk.on_update.tag());
4097 }
4098 write_u16(
4107 &mut out,
4108 u16::try_from(t.schema.uniqueness_constraints.len())
4109 .expect("≤ 65k uniqueness constraints/table"),
4110 );
4111 for uc in &t.schema.uniqueness_constraints {
4112 out.push(u8::from(uc.is_primary_key));
4113 write_u16(
4114 &mut out,
4115 u16::try_from(uc.columns.len())
4116 .expect("≤ 65k cols in uniqueness constraint"),
4117 );
4118 for &p in &uc.columns {
4119 write_u16(
4120 &mut out,
4121 u16::try_from(p).expect("≤ 65k columns/table"),
4122 );
4123 }
4124 }
4125 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4132 for (i, c) in t.schema.columns.iter().enumerate() {
4133 if let Some(e) = &c.runtime_default {
4134 rt_defaults.push((i, e.as_str()));
4135 }
4136 }
4137 write_u16(
4138 &mut out,
4139 u16::try_from(rt_defaults.len())
4140 .expect("≤ 65k runtime defaults/table"),
4141 );
4142 for (pos, expr) in rt_defaults {
4143 write_u16(
4144 &mut out,
4145 u16::try_from(pos).expect("≤ 65k columns/table"),
4146 );
4147 write_str(&mut out, expr);
4148 }
4149 }
4150 out
4151 }
4152
4153 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4156 let mut cur = Cursor::new(buf);
4157 let magic = cur.take(8)?;
4158 if magic != FILE_MAGIC {
4159 return Err(StorageError::Corrupt(format!(
4160 "bad magic: expected SPGDB001, got {magic:?}"
4161 )));
4162 }
4163 let version = cur.read_u8()?;
4164 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4165 return Err(StorageError::Corrupt(format!(
4166 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4167 )));
4168 }
4169 let table_count = cur.read_u32()? as usize;
4170 let mut cat = Self::new();
4171 for _ in 0..table_count {
4172 deserialize_table(&mut cur, &mut cat, version)?;
4173 }
4174 if cur.pos < buf.len() {
4175 return Err(StorageError::Corrupt(format!(
4176 "trailing bytes: {} unread",
4177 buf.len() - cur.pos
4178 )));
4179 }
4180 Ok(cat)
4181 }
4182}
4183
4184fn deserialize_table(
4189 cur: &mut Cursor<'_>,
4190 cat: &mut Catalog,
4191 version: u8,
4192) -> Result<(), StorageError> {
4193 let table_name = cur.read_str()?;
4194 let name = table_name.clone();
4195 let col_count = cur.read_u16()? as usize;
4196 let mut cols = Vec::with_capacity(col_count);
4197 for _ in 0..col_count {
4198 let c_name = cur.read_str()?;
4199 let ty = cur.read_data_type()?;
4200 let nullable = cur.read_u8()? != 0;
4201 let default = match cur.read_u8()? {
4202 0 => None,
4203 1 => Some(cur.read_value()?),
4204 other => {
4205 return Err(StorageError::Corrupt(format!(
4206 "unknown default tag: {other}"
4207 )));
4208 }
4209 };
4210 let auto_increment = cur.read_u8()? != 0;
4211 cols.push(ColumnSchema {
4215 name: c_name,
4216 ty,
4217 nullable,
4218 default,
4219 runtime_default: None,
4220 auto_increment,
4221 });
4222 }
4223 let n_cols = cols.len();
4224 cat.create_table(TableSchema::new(name, cols))?;
4225 let t = cat.tables.last_mut().expect("create_table just pushed");
4229 deserialize_rows(cur, t, n_cols)?;
4230 deserialize_indices(cur, t, version)?;
4231 if version >= 11 {
4237 let has = cur.read_u8()?;
4238 let hot_tier_bytes = match has {
4239 0 => None,
4240 1 => Some(cur.read_u64()?),
4241 other => {
4242 return Err(StorageError::Corrupt(format!(
4243 "hot_tier_bytes appendix: unknown has-value byte {other}"
4244 )));
4245 }
4246 };
4247 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4248 }
4249 if version >= 13 {
4252 let fk_count = cur.read_u16()? as usize;
4253 let mut fks = Vec::with_capacity(fk_count);
4254 for _ in 0..fk_count {
4255 let name = match cur.read_u8()? {
4256 0 => None,
4257 1 => Some(cur.read_str()?),
4258 other => {
4259 return Err(StorageError::Corrupt(format!(
4260 "FK appendix: unknown has-name byte {other}"
4261 )));
4262 }
4263 };
4264 let local_arity = cur.read_u16()? as usize;
4265 let mut local_columns = Vec::with_capacity(local_arity);
4266 for _ in 0..local_arity {
4267 local_columns.push(cur.read_u16()? as usize);
4268 }
4269 let parent_table = cur.read_str()?;
4270 let parent_arity = cur.read_u16()? as usize;
4271 if parent_arity != local_arity {
4272 return Err(StorageError::Corrupt(format!(
4273 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4274 )));
4275 }
4276 let mut parent_columns = Vec::with_capacity(parent_arity);
4277 for _ in 0..parent_arity {
4278 parent_columns.push(cur.read_u16()? as usize);
4279 }
4280 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4281 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4282 })?;
4283 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4284 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4285 })?;
4286 fks.push(ForeignKeyConstraint {
4287 name,
4288 local_columns,
4289 parent_table,
4290 parent_columns,
4291 on_delete,
4292 on_update,
4293 });
4294 }
4295 t.schema_mut().foreign_keys = fks;
4296 }
4297 if version >= 15 {
4300 let uc_count = cur.read_u16()? as usize;
4301 let mut ucs = Vec::with_capacity(uc_count);
4302 for _ in 0..uc_count {
4303 let is_pk = cur.read_u8()? != 0;
4304 let arity = cur.read_u16()? as usize;
4305 let mut cols = Vec::with_capacity(arity);
4306 for _ in 0..arity {
4307 cols.push(cur.read_u16()? as usize);
4308 }
4309 ucs.push(UniquenessConstraint {
4310 is_primary_key: is_pk,
4311 columns: cols,
4312 });
4313 }
4314 t.schema_mut().uniqueness_constraints = ucs;
4315 let rt_count = cur.read_u16()? as usize;
4317 for _ in 0..rt_count {
4318 let pos = cur.read_u16()? as usize;
4319 let expr = cur.read_str()?;
4320 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4321 col.runtime_default = Some(expr);
4322 }
4323 }
4324 }
4325 let _ = table_name;
4326 Ok(())
4327}
4328
4329fn deserialize_rows(
4330 cur: &mut Cursor<'_>,
4331 t: &mut Table,
4332 _n_cols: usize,
4333) -> Result<(), StorageError> {
4334 let row_count = cur.read_u32()? as usize;
4335 let mut hot_bytes: u64 = 0;
4340 for _ in 0..row_count {
4341 let tail = &cur.buf[cur.pos..];
4342 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4343 cur.pos += consumed;
4344 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4350 t.rows.push_mut(row);
4351 }
4352 t.hot_bytes = hot_bytes;
4353 Ok(())
4354}
4355
4356fn deserialize_indices(
4357 cur: &mut Cursor<'_>,
4358 t: &mut Table,
4359 version: u8,
4360) -> Result<(), StorageError> {
4361 let index_count = cur.read_u16()? as usize;
4362 for _ in 0..index_count {
4363 let idx_name = cur.read_str()?;
4364 let col_pos = cur.read_u16()? as usize;
4365 let column_name = t
4366 .schema
4367 .columns
4368 .get(col_pos)
4369 .ok_or_else(|| {
4370 StorageError::Corrupt(format!(
4371 "index {idx_name:?} points at non-existent column position {col_pos}"
4372 ))
4373 })?
4374 .name
4375 .clone();
4376 let kind_tag = cur.read_u8()?;
4377 match kind_tag {
4378 0 => {
4379 if version >= 9 {
4380 let map = read_btree_map(cur)?;
4385 t.restore_btree_index(idx_name, &column_name, map)?;
4386 } else {
4387 t.add_index(idx_name, &column_name)?;
4392 }
4393 }
4394 1 => {
4395 let m = cur.read_u16()? as usize;
4396 let graph = cur.read_nsw_graph(m)?;
4397 t.restore_nsw_index(idx_name, &column_name, graph)?;
4398 }
4399 2 => {
4400 let column_type = cur.read_data_type()?;
4404 t.restore_brin_index(idx_name, &column_name, column_type)?;
4405 }
4406 other => {
4407 return Err(StorageError::Corrupt(format!(
4408 "unknown index kind tag: {other}"
4409 )));
4410 }
4411 }
4412 if version >= 12 {
4415 let num_included = cur.read_u16()? as usize;
4416 if num_included > 0 {
4417 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4418 for _ in 0..num_included {
4419 let cp = cur.read_u16()? as usize;
4420 if cp >= t.schema.columns.len() {
4421 return Err(StorageError::Corrupt(format!(
4422 "INCLUDE column position {cp} out of range \
4423 ({} schema columns)",
4424 t.schema.columns.len()
4425 )));
4426 }
4427 included.push(cp);
4428 }
4429 if let Some(last) = t.indices.last_mut() {
4430 last.included_columns = included;
4431 }
4432 }
4433 match cur.read_u8()? {
4435 0 => {}
4436 1 => {
4437 let pred = cur.read_str()?;
4438 if let Some(last) = t.indices.last_mut() {
4439 last.partial_predicate = Some(pred);
4440 }
4441 }
4442 other => {
4443 return Err(StorageError::Corrupt(format!(
4444 "partial_predicate tag: unknown byte {other}"
4445 )));
4446 }
4447 }
4448 match cur.read_u8()? {
4450 0 => {}
4451 1 => {
4452 let expr = cur.read_str()?;
4453 if let Some(last) = t.indices.last_mut() {
4454 last.expression = Some(expr);
4455 }
4456 }
4457 other => {
4458 return Err(StorageError::Corrupt(format!(
4459 "expression tag: unknown byte {other}"
4460 )));
4461 }
4462 }
4463 }
4464 }
4465 Ok(())
4466}
4467
4468fn read_btree_map(
4472 cur: &mut Cursor<'_>,
4473) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4474 let entry_count = cur.read_u32()? as usize;
4475 let mut map = PersistentBTreeMap::new();
4476 for _ in 0..entry_count {
4477 let key = cur.read_index_key()?;
4478 let locator_count = cur.read_u32()? as usize;
4479 let mut locators = Vec::with_capacity(locator_count);
4480 for _ in 0..locator_count {
4481 let tail = &cur.buf[cur.pos..];
4482 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4483 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4484 })?;
4485 cur.pos += consumed;
4486 locators.push(loc);
4487 }
4488 map.insert_mut(key, locators);
4489 }
4490 Ok(map)
4491}
4492
4493fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4509 let entry = g.entry.map_or(u32::MAX, |e| {
4510 u32::try_from(e).expect("NSW entry fits in u32")
4511 });
4512 write_u16(
4513 out,
4514 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4515 );
4516 out.extend_from_slice(&entry.to_le_bytes());
4517 out.push(g.entry_level);
4518 let node_count = g.levels.len();
4519 write_u32(
4520 out,
4521 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4522 );
4523 for &lvl in &g.levels {
4524 out.push(lvl);
4525 }
4526 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4527 out.push(layer_count);
4528 for layer in &g.layers {
4529 write_u32(
4530 out,
4531 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4532 );
4533 for neighbors in layer {
4534 write_u16(
4535 out,
4536 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4537 );
4538 for &peer in neighbors {
4542 write_u32(out, peer);
4543 }
4544 }
4545 }
4546}
4547
4548fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4549 match t {
4550 DataType::Int => out.push(1),
4551 DataType::BigInt => out.push(2),
4552 DataType::Float => out.push(3),
4553 DataType::Text => out.push(4),
4554 DataType::Bool => out.push(5),
4555 DataType::Vector { dim, encoding } => match encoding {
4556 VecEncoding::F32 => {
4560 out.push(6);
4561 out.extend_from_slice(&dim.to_le_bytes());
4562 }
4563 VecEncoding::F16 => {
4566 out.push(15);
4567 out.extend_from_slice(&dim.to_le_bytes());
4568 }
4569 VecEncoding::Sq8 => {
4575 out.push(14);
4576 out.extend_from_slice(&dim.to_le_bytes());
4577 }
4578 },
4579 DataType::SmallInt => out.push(7),
4580 DataType::Varchar(max) => {
4581 out.push(8);
4582 out.extend_from_slice(&max.to_le_bytes());
4583 }
4584 DataType::Char(size) => {
4585 out.push(9);
4586 out.extend_from_slice(&size.to_le_bytes());
4587 }
4588 DataType::Numeric { precision, scale } => {
4589 out.push(10);
4590 out.push(precision);
4591 out.push(scale);
4592 }
4593 DataType::Date => out.push(11),
4594 DataType::Timestamp => out.push(12),
4595 DataType::Timestamptz => out.push(17),
4599 DataType::Interval => {
4604 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4605 }
4606 DataType::Json => out.push(13),
4607 DataType::Jsonb => out.push(16),
4610 }
4611}
4612
4613impl Cursor<'_> {
4614 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4615 let tag = self.read_u8()?;
4616 match tag {
4617 1 => Ok(DataType::Int),
4618 2 => Ok(DataType::BigInt),
4619 3 => Ok(DataType::Float),
4620 4 => Ok(DataType::Text),
4621 5 => Ok(DataType::Bool),
4622 6 => Ok(DataType::Vector {
4623 dim: self.read_u32()?,
4624 encoding: VecEncoding::F32,
4625 }),
4626 7 => Ok(DataType::SmallInt),
4627 8 => Ok(DataType::Varchar(self.read_u32()?)),
4628 9 => Ok(DataType::Char(self.read_u32()?)),
4629 10 => {
4630 let precision = self.read_u8()?;
4631 let scale = self.read_u8()?;
4632 Ok(DataType::Numeric { precision, scale })
4633 }
4634 11 => Ok(DataType::Date),
4635 12 => Ok(DataType::Timestamp),
4636 13 => Ok(DataType::Json),
4637 14 => Ok(DataType::Vector {
4638 dim: self.read_u32()?,
4639 encoding: VecEncoding::Sq8,
4640 }),
4641 15 => Ok(DataType::Vector {
4645 dim: self.read_u32()?,
4646 encoding: VecEncoding::F16,
4647 }),
4648 16 => Ok(DataType::Jsonb),
4652 17 => Ok(DataType::Timestamptz),
4656 other => Err(StorageError::Corrupt(format!(
4657 "unknown data type tag: {other}"
4658 ))),
4659 }
4660 }
4661}
4662
4663pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4669 debug_assert_eq!(
4670 row.values.len(),
4671 schema.columns.len(),
4672 "row_body_encoded_len: row arity must match schema"
4673 );
4674 let bitmap_bytes = schema.columns.len().div_ceil(8);
4675 let mut n = bitmap_bytes;
4676 for (col_idx, v) in row.values.iter().enumerate() {
4677 if matches!(v, Value::Null) {
4678 continue;
4679 }
4680 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4681 }
4682 n
4683}
4684
4685fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4691 match v {
4692 Value::SmallInt(_) => 2,
4693 Value::Int(_) | Value::Date(_) => 4,
4695 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4697 Value::Bool(_) => 1,
4698 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4700 Value::Vector(vec) => 4 + 4 * vec.len(),
4702 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4709 Value::HalfVector(h) => 4 + h.bytes.len(),
4712 Value::Numeric { .. } => 16 + 1,
4714 Value::Null => 0,
4716 Value::Interval { .. } => {
4718 unreachable!("Value::Interval has no on-disk encoding")
4719 }
4720 }
4721}
4722
4723pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4734 debug_assert_eq!(
4735 row.values.len(),
4736 schema.columns.len(),
4737 "dense encode: row arity must match schema"
4738 );
4739 let bitmap_bytes = schema.columns.len().div_ceil(8);
4740 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4743 let bitmap_offset = out.len();
4744 out.resize(bitmap_offset + bitmap_bytes, 0);
4745 for (i, v) in row.values.iter().enumerate() {
4746 if matches!(v, Value::Null) {
4747 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4748 }
4749 }
4750 for (col_idx, v) in row.values.iter().enumerate() {
4751 if matches!(v, Value::Null) {
4752 continue;
4753 }
4754 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4755 }
4756 out
4757}
4758
4759pub fn decode_row_body_dense(
4765 bytes: &[u8],
4766 schema: &TableSchema,
4767) -> Result<(Row, usize), StorageError> {
4768 let mut cur = Cursor::new(bytes);
4769 let bitmap_bytes = schema.columns.len().div_ceil(8);
4770 let mut bitmap_buf = [0u8; 32];
4771 if bitmap_bytes > bitmap_buf.len() {
4772 return Err(StorageError::Corrupt(format!(
4773 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4774 )));
4775 }
4776 let slice = cur.take(bitmap_bytes)?;
4777 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4778 let mut values = Vec::with_capacity(schema.columns.len());
4779 for (col_idx, col) in schema.columns.iter().enumerate() {
4780 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4781 values.push(Value::Null);
4782 } else {
4783 values.push(cur.read_value_body(col.ty)?);
4784 }
4785 }
4786 Ok((Row { values }, cur.pos))
4787}
4788
4789fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4798 match (v, ty) {
4799 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4800 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4801 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4802 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4803 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4804 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4805 write_str(out, s);
4806 }
4807 (
4808 Value::Vector(v),
4809 DataType::Vector {
4810 encoding: VecEncoding::F32,
4811 ..
4812 },
4813 ) => {
4814 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4815 out.extend_from_slice(&dim.to_le_bytes());
4816 for x in v {
4817 out.extend_from_slice(&x.to_le_bytes());
4818 }
4819 }
4820 (
4826 Value::Sq8Vector(q),
4827 DataType::Vector {
4828 encoding: VecEncoding::Sq8,
4829 ..
4830 },
4831 ) => {
4832 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4833 out.extend_from_slice(&dim.to_le_bytes());
4834 out.extend_from_slice(&q.min.to_le_bytes());
4835 out.extend_from_slice(&q.max.to_le_bytes());
4836 out.extend_from_slice(&q.bytes);
4837 }
4838 (
4842 Value::HalfVector(h),
4843 DataType::Vector {
4844 encoding: VecEncoding::F16,
4845 ..
4846 },
4847 ) => {
4848 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4849 out.extend_from_slice(&dim.to_le_bytes());
4850 out.extend_from_slice(&h.bytes);
4851 }
4852 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4853 out.extend_from_slice(&scaled.to_le_bytes());
4854 out.push(scale);
4855 }
4856 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4857 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
4858 out.extend_from_slice(&t.to_le_bytes())
4859 }
4860 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
4864 (other, ty) => unreachable!(
4868 "schema-driven encode received mismatched value/type pair: \
4869 value tag={:?}, column type={:?}",
4870 other.data_type(),
4871 ty
4872 ),
4873 }
4874}
4875
4876fn write_value(out: &mut Vec<u8>, v: &Value) {
4877 match v {
4878 Value::Null => out.push(0),
4879 Value::SmallInt(n) => {
4880 out.push(7);
4881 out.extend_from_slice(&n.to_le_bytes());
4882 }
4883 Value::Int(n) => {
4884 out.push(1);
4885 out.extend_from_slice(&n.to_le_bytes());
4886 }
4887 Value::BigInt(n) => {
4888 out.push(2);
4889 out.extend_from_slice(&n.to_le_bytes());
4890 }
4891 Value::Float(x) => {
4892 out.push(3);
4893 out.extend_from_slice(&x.to_le_bytes());
4894 }
4895 Value::Text(s) | Value::Json(s) => {
4900 out.push(4);
4901 write_str(out, s);
4902 }
4903 Value::Bool(b) => {
4904 out.push(5);
4905 out.push(u8::from(*b));
4906 }
4907 Value::Vector(v) => {
4908 out.push(6);
4909 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4910 out.extend_from_slice(&dim.to_le_bytes());
4911 for x in v {
4912 out.extend_from_slice(&x.to_le_bytes());
4913 }
4914 }
4915 Value::Sq8Vector(q) => {
4920 out.push(11);
4921 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4922 out.extend_from_slice(&dim.to_le_bytes());
4923 out.extend_from_slice(&q.min.to_le_bytes());
4924 out.extend_from_slice(&q.max.to_le_bytes());
4925 out.extend_from_slice(&q.bytes);
4926 }
4927 Value::HalfVector(h) => {
4932 out.push(12);
4933 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4934 out.extend_from_slice(&dim.to_le_bytes());
4935 out.extend_from_slice(&h.bytes);
4936 }
4937 Value::Numeric { scaled, scale } => {
4938 out.push(8);
4939 out.extend_from_slice(&scaled.to_le_bytes());
4940 out.push(*scale);
4941 }
4942 Value::Date(d) => {
4943 out.push(9);
4944 out.extend_from_slice(&d.to_le_bytes());
4945 }
4946 Value::Timestamp(t) => {
4947 out.push(10);
4948 out.extend_from_slice(&t.to_le_bytes());
4949 }
4950 Value::Interval { .. } => {
4954 unreachable!(
4955 "Value::Interval has no on-disk encoding; engine must reject it before write"
4956 )
4957 }
4958 }
4959}
4960
4961fn write_u16(out: &mut Vec<u8>, n: u16) {
4962 out.extend_from_slice(&n.to_le_bytes());
4963}
4964fn write_u32(out: &mut Vec<u8>, n: u32) {
4965 out.extend_from_slice(&n.to_le_bytes());
4966}
4967fn write_str(out: &mut Vec<u8>, s: &str) {
4968 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
4969 write_u16(out, len);
4970 out.extend_from_slice(s.as_bytes());
4971}
4972
4973fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
4977 match key {
4978 IndexKey::Int(n) => {
4979 out.push(INDEX_KEY_TAG_INT);
4980 out.extend_from_slice(&n.to_le_bytes());
4981 }
4982 IndexKey::Text(s) => {
4983 out.push(INDEX_KEY_TAG_TEXT);
4984 write_str(out, s);
4985 }
4986 IndexKey::Bool(b) => {
4987 out.push(INDEX_KEY_TAG_BOOL);
4988 out.push(u8::from(*b));
4989 }
4990 }
4991}
4992
4993struct Cursor<'a> {
4994 buf: &'a [u8],
4995 pos: usize,
4996}
4997
4998impl<'a> Cursor<'a> {
4999 const fn new(buf: &'a [u8]) -> Self {
5000 Self { buf, pos: 0 }
5001 }
5002
5003 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5004 let end = self
5005 .pos
5006 .checked_add(n)
5007 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5008 if end > self.buf.len() {
5009 return Err(StorageError::Corrupt(format!(
5010 "unexpected EOF at offset {} (wanted {n} more bytes)",
5011 self.pos
5012 )));
5013 }
5014 let s = &self.buf[self.pos..end];
5015 self.pos = end;
5016 Ok(s)
5017 }
5018
5019 fn read_u8(&mut self) -> Result<u8, StorageError> {
5020 Ok(self.take(1)?[0])
5021 }
5022 fn read_u16(&mut self) -> Result<u16, StorageError> {
5023 let s = self.take(2)?;
5024 Ok(u16::from_le_bytes([s[0], s[1]]))
5025 }
5026 fn read_u32(&mut self) -> Result<u32, StorageError> {
5027 let s = self.take(4)?;
5028 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5029 }
5030 fn read_i32(&mut self) -> Result<i32, StorageError> {
5031 let s = self.take(4)?;
5032 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5033 }
5034 fn read_u64(&mut self) -> Result<u64, StorageError> {
5037 let s = self.take(8)?;
5038 Ok(u64::from_le_bytes([
5039 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
5040 ]))
5041 }
5042 fn read_i64(&mut self) -> Result<i64, StorageError> {
5043 let s = self.take(8)?;
5044 let arr: [u8; 8] = s.try_into().expect("checked");
5045 Ok(i64::from_le_bytes(arr))
5046 }
5047 fn read_f64(&mut self) -> Result<f64, StorageError> {
5048 let s = self.take(8)?;
5049 let arr: [u8; 8] = s.try_into().expect("checked");
5050 Ok(f64::from_le_bytes(arr))
5051 }
5052 fn read_f32(&mut self) -> Result<f32, StorageError> {
5053 let s = self.take(4)?;
5054 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5055 }
5056 fn read_str(&mut self) -> Result<String, StorageError> {
5057 let len = self.read_u16()? as usize;
5058 let bytes = self.take(len)?;
5059 core::str::from_utf8(bytes)
5060 .map(String::from)
5061 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
5062 }
5063
5064 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
5068 let tag = self.read_u8()?;
5069 match tag {
5070 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
5071 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
5072 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
5073 other => Err(StorageError::Corrupt(format!(
5074 "unknown index key tag: {other}"
5075 ))),
5076 }
5077 }
5078 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
5084 match ty {
5085 DataType::SmallInt => {
5086 let s = self.take(2)?;
5087 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5088 }
5089 DataType::Int => Ok(Value::Int(self.read_i32()?)),
5090 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
5091 DataType::Float => Ok(Value::Float(self.read_f64()?)),
5092 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
5093 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
5094 Ok(Value::Text(self.read_str()?))
5095 }
5096 DataType::Vector {
5097 encoding: VecEncoding::F32,
5098 ..
5099 } => {
5100 let dim = self.read_u32()? as usize;
5101 let mut v = Vec::with_capacity(dim);
5102 for _ in 0..dim {
5103 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5104 v.push(f32::from_le_bytes(bytes));
5105 }
5106 Ok(Value::Vector(v))
5107 }
5108 DataType::Vector {
5109 encoding: VecEncoding::Sq8,
5110 ..
5111 } => {
5112 let dim = self.read_u32()? as usize;
5113 let min = self.read_f32()?;
5114 let max = self.read_f32()?;
5115 let bytes = self.take(dim)?.to_vec();
5116 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5117 }
5118 DataType::Vector {
5119 encoding: VecEncoding::F16,
5120 ..
5121 } => {
5122 let dim = self.read_u32()? as usize;
5123 let bytes = self.take(dim * 2)?.to_vec();
5124 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5125 }
5126 DataType::Numeric { .. } => {
5127 let s = self.take(16)?;
5128 let arr: [u8; 16] = s.try_into().expect("checked");
5129 let scaled = i128::from_le_bytes(arr);
5130 let scale = self.read_u8()?;
5131 Ok(Value::Numeric { scaled, scale })
5132 }
5133 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5134 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5135 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5136 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5137 DataType::Interval => {
5138 Err(StorageError::Corrupt(
5143 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5144 ))
5145 }
5146 DataType::Json => Ok(Value::Json(self.read_str()?)),
5147 }
5148 }
5149
5150 fn read_value(&mut self) -> Result<Value, StorageError> {
5151 let tag = self.read_u8()?;
5152 match tag {
5153 0 => Ok(Value::Null),
5154 1 => Ok(Value::Int(self.read_i32()?)),
5155 2 => Ok(Value::BigInt(self.read_i64()?)),
5156 3 => Ok(Value::Float(self.read_f64()?)),
5157 4 => Ok(Value::Text(self.read_str()?)),
5158 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5159 6 => {
5160 let dim = self.read_u32()? as usize;
5161 let mut v = Vec::with_capacity(dim);
5162 for _ in 0..dim {
5163 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5164 v.push(f32::from_le_bytes(bytes));
5165 }
5166 Ok(Value::Vector(v))
5167 }
5168 7 => {
5169 let s = self.take(2)?;
5170 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5171 }
5172 8 => {
5173 let s = self.take(16)?;
5174 let arr: [u8; 16] = s.try_into().expect("checked");
5175 let scaled = i128::from_le_bytes(arr);
5176 let scale = self.read_u8()?;
5177 Ok(Value::Numeric { scaled, scale })
5178 }
5179 9 => Ok(Value::Date(self.read_i32()?)),
5180 10 => Ok(Value::Timestamp(self.read_i64()?)),
5181 11 => {
5186 let dim = self.read_u32()? as usize;
5187 let min = self.read_f32()?;
5188 let max = self.read_f32()?;
5189 let bytes = self.take(dim)?.to_vec();
5190 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5191 }
5192 12 => {
5195 let dim = self.read_u32()? as usize;
5196 let bytes = self.take(dim * 2)?.to_vec();
5197 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5198 }
5199 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5200 }
5201 }
5202
5203 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5207 let m_max_0 = self.read_u16()? as usize;
5208 let entry_raw = self.read_u32()?;
5209 let entry = if entry_raw == u32::MAX {
5210 None
5211 } else {
5212 Some(entry_raw as usize)
5213 };
5214 let entry_level = self.read_u8()?;
5215 let node_count = self.read_u32()? as usize;
5216 let mut levels: PersistentVec<u8> = PersistentVec::new();
5221 for _ in 0..node_count {
5222 levels.push_mut(self.read_u8()?);
5223 }
5224 let layer_count = self.read_u8()? as usize;
5225 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5226 for _ in 0..layer_count {
5227 let n = self.read_u32()? as usize;
5228 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5229 for _ in 0..n {
5230 let cnt = self.read_u16()? as usize;
5231 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5232 for _ in 0..cnt {
5233 row.push(self.read_u32()?);
5234 }
5235 per_layer.push_mut(row);
5236 }
5237 layers.push(per_layer);
5238 }
5239 Ok(NswGraph {
5240 m,
5241 m_max_0,
5242 entry,
5243 entry_level,
5244 levels,
5245 layers,
5246 })
5247 }
5248}
5249
5250#[cfg(test)]
5251mod tests {
5252 use super::*;
5253 use alloc::string::ToString;
5254 use alloc::vec;
5255
5256 #[cfg(target_arch = "aarch64")]
5257 #[test]
5258 fn neon_l2_matches_scalar() {
5259 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5264 for &d in &dims {
5265 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5266 let mut a = Vec::with_capacity(d);
5267 let mut b = Vec::with_capacity(d);
5268 for _ in 0..d {
5269 state = state
5270 .wrapping_mul(6_364_136_223_846_793_005)
5271 .wrapping_add(1);
5272 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5273 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5274 state = state
5275 .wrapping_mul(6_364_136_223_846_793_005)
5276 .wrapping_add(1);
5277 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5278 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5279 a.push(x);
5280 b.push(y);
5281 }
5282 let scalar = l2_distance_sq_scalar(&a, &b);
5283 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5284 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5285 assert!(
5286 (scalar - neon).abs() <= tol,
5287 "dim={d}: scalar={scalar} neon={neon} diff={}",
5288 (scalar - neon).abs()
5289 );
5290 }
5291 }
5292
5293 #[cfg(target_arch = "aarch64")]
5294 #[test]
5295 fn neon_inner_product_matches_scalar() {
5296 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5300 for &d in &dims {
5301 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5302 let mut a = Vec::with_capacity(d);
5303 let mut b = Vec::with_capacity(d);
5304 for _ in 0..d {
5305 state = state
5306 .wrapping_mul(6_364_136_223_846_793_005)
5307 .wrapping_add(1);
5308 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5309 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5310 state = state
5311 .wrapping_mul(6_364_136_223_846_793_005)
5312 .wrapping_add(1);
5313 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5314 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5315 a.push(x);
5316 b.push(y);
5317 }
5318 let scalar = inner_product_scalar(&a, &b);
5319 let neon = unsafe { inner_product_neon(&a, &b) };
5320 #[allow(clippy::cast_precision_loss)]
5321 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5322 assert!(
5323 (scalar - neon).abs() <= tol,
5324 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5325 (scalar - neon).abs()
5326 );
5327 }
5328 }
5329
5330 #[cfg(target_arch = "aarch64")]
5331 #[allow(clippy::similar_names)]
5332 #[test]
5333 fn neon_cosine_dot_norms_matches_scalar() {
5334 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5335 for &d in &dims {
5336 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5337 let mut a = Vec::with_capacity(d);
5338 let mut b = Vec::with_capacity(d);
5339 for _ in 0..d {
5340 state = state
5341 .wrapping_mul(6_364_136_223_846_793_005)
5342 .wrapping_add(1);
5343 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5344 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5345 state = state
5346 .wrapping_mul(6_364_136_223_846_793_005)
5347 .wrapping_add(1);
5348 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5349 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5350 a.push(x);
5351 b.push(y);
5352 }
5353 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5354 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5355 #[allow(clippy::cast_precision_loss)]
5356 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5357 #[allow(clippy::cast_precision_loss)]
5358 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5359 assert!(
5360 (dot_s - dot_n).abs() <= tol_d,
5361 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5362 );
5363 assert!(
5364 (na_s - na_n).abs() <= tol_n,
5365 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5366 );
5367 assert!(
5368 (nb_s - nb_n).abs() <= tol_n,
5369 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5370 );
5371 }
5372 }
5373
5374 fn make_users_schema() -> TableSchema {
5375 TableSchema::new(
5376 "users",
5377 vec![
5378 ColumnSchema::new("id", DataType::Int, false),
5379 ColumnSchema::new("name", DataType::Text, false),
5380 ColumnSchema::new("score", DataType::Float, true),
5381 ],
5382 )
5383 }
5384
5385 #[test]
5386 fn value_type_tag_matches_variant() {
5387 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5388 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5389 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5390 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5391 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5392 assert_eq!(Value::Null.data_type(), None);
5393 assert!(Value::Null.is_null());
5394 assert!(!Value::Int(0).is_null());
5395 }
5396
5397 #[test]
5398 fn sq8_value_reports_sq8_data_type() {
5399 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5404 let v = Value::Sq8Vector(q);
5405 assert_eq!(
5406 v.data_type(),
5407 Some(DataType::Vector {
5408 dim: 5,
5409 encoding: VecEncoding::Sq8,
5410 }),
5411 );
5412 }
5413
5414 #[test]
5415 fn datatype_display_matches_pg_keyword() {
5416 assert_eq!(DataType::Int.to_string(), "INT");
5417 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5418 assert_eq!(DataType::Float.to_string(), "FLOAT");
5419 assert_eq!(DataType::Text.to_string(), "TEXT");
5420 assert_eq!(DataType::Bool.to_string(), "BOOL");
5421 }
5422
5423 #[test]
5424 fn row_len_and_emptiness() {
5425 let r = Row::new(vec![Value::Int(1), Value::Null]);
5426 assert_eq!(r.len(), 2);
5427 assert!(!r.is_empty());
5428 assert!(Row::new(Vec::new()).is_empty());
5429 }
5430
5431 #[test]
5432 fn table_schema_column_position() {
5433 let s = make_users_schema();
5434 assert_eq!(s.column_position("id"), Some(0));
5435 assert_eq!(s.column_position("score"), Some(2));
5436 assert_eq!(s.column_position("missing"), None);
5437 }
5438
5439 #[test]
5440 fn catalog_create_table_then_lookup() {
5441 let mut cat = Catalog::new();
5442 cat.create_table(make_users_schema()).unwrap();
5443 assert_eq!(cat.table_count(), 1);
5444 assert!(cat.get("users").is_some());
5445 assert!(cat.get("nope").is_none());
5446 }
5447
5448 #[test]
5449 fn catalog_duplicate_table_is_rejected() {
5450 let mut cat = Catalog::new();
5451 cat.create_table(make_users_schema()).unwrap();
5452 let err = cat.create_table(make_users_schema()).unwrap_err();
5453 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5454 }
5455
5456 #[test]
5457 fn table_insert_happy_path_appends_row() {
5458 let mut cat = Catalog::new();
5459 cat.create_table(make_users_schema()).unwrap();
5460 let t = cat.get_mut("users").unwrap();
5461 t.insert(Row::new(vec![
5462 Value::Int(1),
5463 Value::Text("alice".into()),
5464 Value::Float(99.5),
5465 ]))
5466 .unwrap();
5467 assert_eq!(t.row_count(), 1);
5468 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5469 }
5470
5471 #[test]
5472 fn table_insert_arity_mismatch() {
5473 let mut cat = Catalog::new();
5474 cat.create_table(make_users_schema()).unwrap();
5475 let t = cat.get_mut("users").unwrap();
5476 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5477 assert!(matches!(
5478 err,
5479 StorageError::ArityMismatch {
5480 expected: 3,
5481 actual: 1
5482 }
5483 ));
5484 assert_eq!(t.row_count(), 0);
5485 }
5486
5487 #[test]
5488 fn table_insert_type_mismatch_reports_column() {
5489 let mut cat = Catalog::new();
5490 cat.create_table(make_users_schema()).unwrap();
5491 let t = cat.get_mut("users").unwrap();
5492 let err = t
5493 .insert(Row::new(vec![
5494 Value::Int(1),
5495 Value::Int(42), Value::Float(0.0),
5497 ]))
5498 .unwrap_err();
5499 match err {
5500 StorageError::TypeMismatch {
5501 ref column,
5502 expected,
5503 actual,
5504 position,
5505 } => {
5506 assert_eq!(column, "name");
5507 assert_eq!(expected, DataType::Text);
5508 assert_eq!(actual, DataType::Int);
5509 assert_eq!(position, 1);
5510 }
5511 other => panic!("unexpected: {other:?}"),
5512 }
5513 assert_eq!(t.row_count(), 0);
5514 }
5515
5516 #[test]
5517 fn table_insert_null_into_not_null_rejected() {
5518 let mut cat = Catalog::new();
5519 cat.create_table(make_users_schema()).unwrap();
5520 let t = cat.get_mut("users").unwrap();
5521 let err = t
5522 .insert(Row::new(vec![
5523 Value::Int(1),
5524 Value::Null, Value::Float(1.0),
5526 ]))
5527 .unwrap_err();
5528 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5529 }
5530
5531 #[test]
5532 fn table_insert_null_into_nullable_ok() {
5533 let mut cat = Catalog::new();
5534 cat.create_table(make_users_schema()).unwrap();
5535 let t = cat.get_mut("users").unwrap();
5536 t.insert(Row::new(vec![
5537 Value::Int(1),
5538 Value::Text("bob".into()),
5539 Value::Null,
5540 ]))
5541 .unwrap();
5542 assert_eq!(t.row_count(), 1);
5543 }
5544
5545 #[test]
5546 fn catalog_get_mut_independent_per_table() {
5547 let mut cat = Catalog::new();
5548 cat.create_table(TableSchema::new(
5549 "a",
5550 vec![ColumnSchema::new("v", DataType::Int, false)],
5551 ))
5552 .unwrap();
5553 cat.create_table(TableSchema::new(
5554 "b",
5555 vec![ColumnSchema::new("v", DataType::Int, false)],
5556 ))
5557 .unwrap();
5558 cat.get_mut("a")
5559 .unwrap()
5560 .insert(Row::new(vec![Value::Int(1)]))
5561 .unwrap();
5562 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5563 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5564 }
5565
5566 fn assert_round_trip(cat: &Catalog) {
5569 let bytes = cat.serialize();
5570 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5571 assert_eq!(restored.table_count(), cat.table_count());
5574 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5575 assert_eq!(a.schema, b.schema);
5576 assert_eq!(a.rows, b.rows);
5577 }
5578 }
5579
5580 #[test]
5581 fn serialize_empty_catalog_round_trips() {
5582 assert_round_trip(&Catalog::new());
5583 }
5584
5585 #[test]
5586 fn serialize_single_empty_table_round_trips() {
5587 let mut cat = Catalog::new();
5588 cat.create_table(make_users_schema()).unwrap();
5589 assert_round_trip(&cat);
5590 }
5591
5592 #[test]
5593 fn nsw_clone_is_o1() {
5594 let mut cat = Catalog::new();
5603 cat.create_table(TableSchema::new(
5604 "docs",
5605 alloc::vec![
5606 ColumnSchema::new("id", DataType::Int, false),
5607 ColumnSchema::new(
5608 "v",
5609 DataType::Vector {
5610 dim: 3,
5611 encoding: VecEncoding::F32
5612 },
5613 true
5614 ),
5615 ],
5616 ))
5617 .unwrap();
5618 let t = cat.get_mut("docs").unwrap();
5619 for i in 0..1500_i32 {
5620 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5622 t.insert(Row::new(alloc::vec![
5623 Value::Int(i),
5624 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5625 ]))
5626 .unwrap();
5627 }
5628 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5629 .unwrap();
5630 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5631 IndexKind::Nsw(g) => g,
5632 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5633 };
5634 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5637 assert!(
5638 g.layers.len() >= 2,
5639 "1500 nodes should populate at least two HNSW layers, got {}",
5640 g.layers.len()
5641 );
5642
5643 let cloned = g.clone();
5644
5645 assert!(
5646 g.levels.shares_storage_with(&cloned.levels),
5647 "levels PV not shared after clone — clone copied elements (O(N))"
5648 );
5649 assert_eq!(g.layers.len(), cloned.layers.len());
5650 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5651 assert!(
5652 orig.shares_storage_with(cl),
5653 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5654 );
5655 }
5656 }
5657
5658 #[test]
5659 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5660 let mut cat = Catalog::new();
5667 cat.create_table(TableSchema::new(
5668 "vecs",
5669 alloc::vec![
5670 ColumnSchema::new("id", DataType::Int, false),
5671 ColumnSchema::new(
5672 "v",
5673 DataType::Vector {
5674 dim: 8,
5675 encoding: VecEncoding::Sq8,
5676 },
5677 false,
5678 ),
5679 ],
5680 ))
5681 .unwrap();
5682 let t = cat.get_mut("vecs").unwrap();
5683 for i in 0..32_i32 {
5684 #[allow(clippy::cast_precision_loss)]
5685 let base = (i as f32) * 0.03;
5686 let v: Vec<f32> = (0..8_i32)
5687 .map(|j| {
5688 #[allow(clippy::cast_precision_loss)]
5689 let off = (j as f32) * 0.01;
5690 base + off
5691 })
5692 .collect();
5693 t.insert(Row::new(alloc::vec![
5694 Value::Int(i),
5695 Value::Sq8Vector(quantize::quantize(&v)),
5696 ]))
5697 .unwrap();
5698 }
5699 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5700 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5703 let (before_cell, before_ty, before_hits) = {
5704 let t_ref = cat.get("vecs").unwrap();
5705 (
5706 t_ref.rows()[5].values[1].clone(),
5707 t_ref.schema().columns[1].ty,
5708 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5709 )
5710 };
5711
5712 let bytes = cat.serialize();
5713 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5714 let rt = restored.get("vecs").unwrap();
5715 assert_eq!(rt.schema().columns[1].ty, before_ty);
5716 assert_eq!(rt.rows()[5].values[1], before_cell);
5717 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5718 assert_eq!(before_hits, after_hits);
5719 }
5720
5721 #[test]
5722 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5723 use crate::halfvec;
5730 let mut cat = Catalog::new();
5731 cat.create_table(TableSchema::new(
5732 "vecs",
5733 alloc::vec![
5734 ColumnSchema::new("id", DataType::Int, false),
5735 ColumnSchema::new(
5736 "v",
5737 DataType::Vector {
5738 dim: 8,
5739 encoding: VecEncoding::F16,
5740 },
5741 false,
5742 ),
5743 ],
5744 ))
5745 .unwrap();
5746 let t = cat.get_mut("vecs").unwrap();
5747 for i in 0..32_i32 {
5748 #[allow(clippy::cast_precision_loss)]
5749 let base = (i as f32) * 0.03;
5750 let v: Vec<f32> = (0..8_i32)
5751 .map(|j| {
5752 #[allow(clippy::cast_precision_loss)]
5753 let off = (j as f32) * 0.01;
5754 base + off
5755 })
5756 .collect();
5757 t.insert(Row::new(alloc::vec![
5758 Value::Int(i),
5759 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
5760 ]))
5761 .unwrap();
5762 }
5763 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5764 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5765 let (before_cell, before_ty, before_hits) = {
5766 let t_ref = cat.get("vecs").unwrap();
5767 (
5768 t_ref.rows()[5].values[1].clone(),
5769 t_ref.schema().columns[1].ty,
5770 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5771 )
5772 };
5773 let bytes = cat.serialize();
5774 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5775 let rt = restored.get("vecs").unwrap();
5776 assert_eq!(rt.schema().columns[1].ty, before_ty);
5777 assert_eq!(rt.rows()[5].values[1], before_cell);
5778 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5779 assert_eq!(before_hits, after_hits);
5780 }
5781
5782 #[test]
5783 #[allow(clippy::similar_names)]
5784 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
5785 use crate::halfvec;
5792 fn next(state: &mut u64) -> f32 {
5793 *state = state
5794 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5795 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5796 #[allow(clippy::cast_precision_loss)]
5797 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5798 2.0 * u - 1.0
5799 }
5800 let dim: u32 = 32;
5801 let n: usize = 512;
5802 let dim_us = dim as usize;
5803 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
5804 let corpus: Vec<Vec<f32>> = (0..n)
5805 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5806 .collect();
5807 let queries: Vec<Vec<f32>> = (0..32)
5808 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5809 .collect();
5810 let exact_top10: Vec<Vec<usize>> = queries
5811 .iter()
5812 .map(|q| {
5813 let mut scored: Vec<(f32, usize)> = corpus
5814 .iter()
5815 .enumerate()
5816 .map(|(i, v)| (l2_distance_sq(v, q), i))
5817 .collect();
5818 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5819 scored.into_iter().take(10).map(|(_, i)| i).collect()
5820 })
5821 .collect();
5822 let mut cat = Catalog::new();
5823 cat.create_table(TableSchema::new(
5824 "vecs",
5825 alloc::vec![
5826 ColumnSchema::new("id", DataType::Int, false),
5827 ColumnSchema::new(
5828 "v",
5829 DataType::Vector {
5830 dim,
5831 encoding: VecEncoding::F16,
5832 },
5833 false,
5834 ),
5835 ],
5836 ))
5837 .unwrap();
5838 let t = cat.get_mut("vecs").unwrap();
5839 for (i, v) in corpus.iter().enumerate() {
5840 t.insert(Row::new(alloc::vec![
5841 Value::Int(i32::try_from(i).unwrap()),
5842 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
5843 ]))
5844 .unwrap();
5845 }
5846 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5847 let table = cat.get("vecs").unwrap();
5848 let mut total_overlap = 0_usize;
5849 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5850 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5851 for h in &hits {
5852 if exact.contains(h) {
5853 total_overlap += 1;
5854 }
5855 }
5856 }
5857 #[allow(clippy::cast_precision_loss)]
5858 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5859 assert!(
5860 recall >= 0.95,
5861 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5862 check halfvec dispatch in `cell_to_query_metric_distance`"
5863 );
5864 }
5865
5866 #[test]
5867 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
5868 use crate::quantize;
5875 fn next(state: &mut u64) -> f32 {
5879 *state = state
5880 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5881 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5882 #[allow(clippy::cast_precision_loss)]
5883 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5884 2.0 * u - 1.0
5885 }
5886 let dim: u32 = 32;
5887 let n: usize = 512;
5888 let dim_us = dim as usize;
5889 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
5890 let corpus: Vec<Vec<f32>> = (0..n)
5891 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5892 .collect();
5893 let queries: Vec<Vec<f32>> = (0..32)
5894 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5895 .collect();
5896 let exact_top10: Vec<Vec<usize>> = queries
5898 .iter()
5899 .map(|q| {
5900 let mut scored: Vec<(f32, usize)> = corpus
5901 .iter()
5902 .enumerate()
5903 .map(|(i, v)| (l2_distance_sq(v, q), i))
5904 .collect();
5905 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5906 scored.into_iter().take(10).map(|(_, i)| i).collect()
5907 })
5908 .collect();
5909 let mut cat = Catalog::new();
5912 cat.create_table(TableSchema::new(
5913 "vecs",
5914 alloc::vec![
5915 ColumnSchema::new("id", DataType::Int, false),
5916 ColumnSchema::new(
5917 "v",
5918 DataType::Vector {
5919 dim,
5920 encoding: VecEncoding::Sq8,
5921 },
5922 false,
5923 ),
5924 ],
5925 ))
5926 .unwrap();
5927 let t = cat.get_mut("vecs").unwrap();
5928 for (i, v) in corpus.iter().enumerate() {
5929 t.insert(Row::new(alloc::vec![
5930 Value::Int(i32::try_from(i).unwrap()),
5931 Value::Sq8Vector(quantize::quantize(v)),
5932 ]))
5933 .unwrap();
5934 }
5935 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5936 let table = cat.get("vecs").unwrap();
5937 let mut total_overlap = 0_usize;
5938 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5939 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5940 for h in &hits {
5941 if exact.contains(h) {
5942 total_overlap += 1;
5943 }
5944 }
5945 }
5946 #[allow(clippy::cast_precision_loss)]
5947 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5948 assert!(
5949 recall >= 0.95,
5950 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5951 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
5952 );
5953 }
5954
5955 #[test]
5956 fn nsw_index_topology_persists_through_round_trip() {
5957 let mut cat = Catalog::new();
5963 cat.create_table(TableSchema::new(
5964 "docs",
5965 alloc::vec![
5966 ColumnSchema::new("id", DataType::Int, false),
5967 ColumnSchema::new(
5968 "v",
5969 DataType::Vector {
5970 dim: 3,
5971 encoding: VecEncoding::F32
5972 },
5973 true
5974 ),
5975 ],
5976 ))
5977 .unwrap();
5978 let t = cat.get_mut("docs").unwrap();
5979 for i in 0..6_i32 {
5980 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
5982 let row = Row::new(alloc::vec![
5983 Value::Int(i),
5984 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5985 ]);
5986 t.insert(row).unwrap();
5987 }
5988 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5989 .unwrap();
5990 let original = match &cat.get("docs").unwrap().indices()[0].kind {
5991 IndexKind::Nsw(g) => g.clone(),
5992 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5993 };
5994 let bytes = cat.serialize();
5995 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5996 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
5997 IndexKind::Nsw(g) => g.clone(),
5998 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5999 };
6000 assert_eq!(restored_graph.m, original.m);
6001 assert_eq!(restored_graph.m_max_0, original.m_max_0);
6002 assert_eq!(restored_graph.entry, original.entry);
6003 assert_eq!(restored_graph.entry_level, original.entry_level);
6004 assert_eq!(restored_graph.levels, original.levels);
6005 assert_eq!(restored_graph.layers, original.layers);
6006 }
6007
6008 #[test]
6009 fn hnsw_level_assignment_is_deterministic() {
6010 for i in 0..32usize {
6013 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
6014 }
6015 }
6016
6017 #[test]
6018 fn hnsw_layer_0_dominates_population() {
6019 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
6024 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
6025 }
6026
6027 #[test]
6028 fn hnsw_search_matches_brute_force_for_l2_top1() {
6029 let mut cat = Catalog::new();
6033 cat.create_table(TableSchema::new(
6034 "vecs",
6035 alloc::vec![
6036 ColumnSchema::new("id", DataType::Int, false),
6037 ColumnSchema::new(
6038 "v",
6039 DataType::Vector {
6040 dim: 3,
6041 encoding: VecEncoding::F32
6042 },
6043 true
6044 ),
6045 ],
6046 ))
6047 .unwrap();
6048 let t = cat.get_mut("vecs").unwrap();
6049 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
6050 (1, [0.0, 0.0, 0.0]),
6051 (2, [1.0, 0.0, 0.0]),
6052 (3, [0.0, 1.0, 0.0]),
6053 (4, [0.0, 0.0, 1.0]),
6054 (5, [1.0, 1.0, 0.0]),
6055 (6, [1.0, 0.0, 1.0]),
6056 (7, [0.0, 1.0, 1.0]),
6057 (8, [1.0, 1.0, 1.0]),
6058 (9, [0.5, 0.5, 0.5]),
6059 (10, [0.2, 0.8, 0.5]),
6060 ];
6061 for &(id, v) in &dataset {
6062 t.insert(Row::new(alloc::vec![
6063 Value::Int(id),
6064 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
6065 ]))
6066 .unwrap();
6067 }
6068 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6069 let idx_pos = cat
6070 .get("vecs")
6071 .unwrap()
6072 .indices()
6073 .iter()
6074 .position(|i| i.name == "v_idx")
6075 .unwrap();
6076 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
6077 let table = cat.get("vecs").unwrap();
6078 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
6079 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
6080 .map(|i| {
6081 let Value::Vector(v) = &table.rows[i].values[1] else {
6082 return (f32::INFINITY, i);
6083 };
6084 (l2_distance_sq(v, &query), i)
6085 })
6086 .collect();
6087 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6088 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
6089 assert_eq!(
6090 hnsw_top[0].1, brute[0].1,
6091 "HNSW top-1 != brute-force top-1 for {query:?}"
6092 );
6093 }
6094 }
6095
6096 #[test]
6097 fn serialize_table_with_rows_round_trips() {
6098 let mut cat = Catalog::new();
6099 cat.create_table(make_users_schema()).unwrap();
6100 let t = cat.get_mut("users").unwrap();
6101 t.insert(Row::new(vec![
6102 Value::Int(1),
6103 Value::Text("alice".into()),
6104 Value::Float(95.5),
6105 ]))
6106 .unwrap();
6107 t.insert(Row::new(vec![
6108 Value::Int(2),
6109 Value::Text("bob".into()),
6110 Value::Null,
6111 ]))
6112 .unwrap();
6113 assert_round_trip(&cat);
6114 }
6115
6116 #[test]
6117 fn serialize_multiple_tables_round_trips() {
6118 let mut cat = Catalog::new();
6119 cat.create_table(make_users_schema()).unwrap();
6120 cat.create_table(TableSchema::new(
6121 "flags",
6122 vec![
6123 ColumnSchema::new("id", DataType::BigInt, false),
6124 ColumnSchema::new("active", DataType::Bool, false),
6125 ],
6126 ))
6127 .unwrap();
6128 cat.get_mut("flags")
6129 .unwrap()
6130 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6131 .unwrap();
6132 assert_round_trip(&cat);
6133 }
6134
6135 #[test]
6136 fn deserialize_rejects_bad_magic() {
6137 let mut buf = b"BADMAGIC".to_vec();
6138 buf.push(FILE_VERSION);
6139 buf.extend_from_slice(&0u32.to_le_bytes());
6140 let err = Catalog::deserialize(&buf).unwrap_err();
6141 assert!(matches!(err, StorageError::Corrupt(_)));
6142 }
6143
6144 #[test]
6145 fn deserialize_rejects_unsupported_version() {
6146 let mut buf = FILE_MAGIC.to_vec();
6147 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6149 let err = Catalog::deserialize(&buf).unwrap_err();
6150 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6151 }
6152
6153 #[test]
6154 fn deserialize_rejects_truncated_file() {
6155 let mut cat = Catalog::new();
6156 cat.create_table(make_users_schema()).unwrap();
6157 let bytes = cat.serialize();
6158 let truncated = &bytes[..bytes.len() - 1];
6160 assert!(matches!(
6161 Catalog::deserialize(truncated),
6162 Err(StorageError::Corrupt(_))
6163 ));
6164 }
6165
6166 #[test]
6167 fn deserialize_rejects_trailing_garbage() {
6168 let cat = Catalog::new();
6169 let mut bytes = cat.serialize();
6170 bytes.push(0xFF);
6171 assert!(matches!(
6172 Catalog::deserialize(&bytes),
6173 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6174 ));
6175 }
6176
6177 fn populated_users() -> Catalog {
6180 let mut cat = Catalog::new();
6181 cat.create_table(make_users_schema()).unwrap();
6182 let t = cat.get_mut("users").unwrap();
6183 for (id, name, score) in [
6184 (1, "alice", Some(90.0)),
6185 (2, "bob", None),
6186 (3, "alice", Some(70.0)), ] {
6188 t.insert(Row::new(vec![
6189 Value::Int(id),
6190 Value::Text(name.into()),
6191 score.map_or(Value::Null, Value::Float),
6192 ]))
6193 .unwrap();
6194 }
6195 cat
6196 }
6197
6198 #[test]
6199 fn add_index_builds_from_existing_rows() {
6200 let mut cat = populated_users();
6201 cat.get_mut("users")
6202 .unwrap()
6203 .add_index("by_id".into(), "id")
6204 .unwrap();
6205 let t = cat.get("users").unwrap();
6206 let idx = t.index_on(0).expect("index_on(0)");
6207 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6208 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6209 }
6210
6211 #[test]
6212 fn add_index_dup_name_rejected() {
6213 let mut cat = populated_users();
6214 let t = cat.get_mut("users").unwrap();
6215 t.add_index("ix".into(), "id").unwrap();
6216 let err = t.add_index("ix".into(), "name").unwrap_err();
6217 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6218 }
6219
6220 #[test]
6221 fn add_index_unknown_column_rejected() {
6222 let mut cat = populated_users();
6223 let err = cat
6224 .get_mut("users")
6225 .unwrap()
6226 .add_index("ix".into(), "ghost")
6227 .unwrap_err();
6228 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6229 }
6230
6231 #[test]
6232 fn insert_after_create_index_updates_it() {
6233 let mut cat = populated_users();
6234 let t = cat.get_mut("users").unwrap();
6235 t.add_index("by_name".into(), "name").unwrap();
6236 t.insert(Row::new(vec![
6237 Value::Int(4),
6238 Value::Text("dave".into()),
6239 Value::Null,
6240 ]))
6241 .unwrap();
6242 let idx = t.index_on(1).unwrap();
6243 assert_eq!(
6244 idx.lookup_eq(&IndexKey::Text("dave".into())),
6245 &[RowLocator::Hot(3)]
6246 );
6247 assert_eq!(
6249 idx.lookup_eq(&IndexKey::Text("alice".into())),
6250 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6251 );
6252 }
6253
6254 #[test]
6255 fn null_or_float_values_are_not_indexed() {
6256 let mut cat = populated_users();
6257 let t = cat.get_mut("users").unwrap();
6258 t.add_index("by_score".into(), "score").unwrap();
6259 let idx = t.index_on(2).unwrap();
6260 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6265 }
6266
6267 #[test]
6270 fn vector_value_data_type_carries_dim() {
6271 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6272 assert_eq!(
6273 v.data_type(),
6274 Some(DataType::Vector {
6275 dim: 3,
6276 encoding: VecEncoding::F32
6277 })
6278 );
6279 }
6280
6281 #[test]
6282 fn vector_column_insert_matching_dim_ok() {
6283 let mut cat = Catalog::new();
6284 cat.create_table(TableSchema::new(
6285 "emb",
6286 vec![ColumnSchema::new(
6287 "v",
6288 DataType::Vector {
6289 dim: 3,
6290 encoding: VecEncoding::F32,
6291 },
6292 false,
6293 )],
6294 ))
6295 .unwrap();
6296 cat.get_mut("emb")
6297 .unwrap()
6298 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6299 .unwrap();
6300 }
6301
6302 #[test]
6303 fn vector_column_insert_dim_mismatch_rejected() {
6304 let mut cat = Catalog::new();
6305 cat.create_table(TableSchema::new(
6306 "emb",
6307 vec![ColumnSchema::new(
6308 "v",
6309 DataType::Vector {
6310 dim: 3,
6311 encoding: VecEncoding::F32,
6312 },
6313 false,
6314 )],
6315 ))
6316 .unwrap();
6317 let err = cat
6318 .get_mut("emb")
6319 .unwrap()
6320 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6321 .unwrap_err();
6322 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6323 }
6324
6325 #[test]
6326 fn vector_value_survives_catalog_round_trip() {
6327 let mut cat = Catalog::new();
6328 cat.create_table(TableSchema::new(
6329 "emb",
6330 vec![
6331 ColumnSchema::new("id", DataType::Int, false),
6332 ColumnSchema::new(
6333 "v",
6334 DataType::Vector {
6335 dim: 4,
6336 encoding: VecEncoding::F32,
6337 },
6338 false,
6339 ),
6340 ],
6341 ))
6342 .unwrap();
6343 cat.get_mut("emb")
6344 .unwrap()
6345 .insert(Row::new(vec![
6346 Value::Int(1),
6347 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6348 ]))
6349 .unwrap();
6350 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6351 let table = restored.get("emb").unwrap();
6352 assert_eq!(
6353 table.schema().columns[1].ty,
6354 DataType::Vector {
6355 dim: 4,
6356 encoding: VecEncoding::F32
6357 }
6358 );
6359 assert_eq!(
6360 table.rows()[0].values[1],
6361 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6362 );
6363 }
6364
6365 #[test]
6366 fn index_survives_serialize_deserialize_round_trip() {
6367 let mut cat = populated_users();
6368 cat.get_mut("users")
6369 .unwrap()
6370 .add_index("by_name".into(), "name")
6371 .unwrap();
6372 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6373 let idx = restored
6374 .get("users")
6375 .unwrap()
6376 .index_on(1)
6377 .expect("index_on(1) after restore");
6378 assert_eq!(idx.name, "by_name");
6379 assert_eq!(
6381 idx.lookup_eq(&IndexKey::Text("alice".into())),
6382 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6383 );
6384 }
6385
6386 fn bigint_pk_users_schema() -> TableSchema {
6391 TableSchema::new(
6392 "users",
6393 vec![
6394 ColumnSchema::new("id", DataType::BigInt, false),
6395 ColumnSchema::new("name", DataType::Text, false),
6396 ],
6397 )
6398 }
6399
6400 fn make_user_row(id: i64, name: &str) -> Row {
6401 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6402 }
6403
6404 #[test]
6405 fn lookup_by_pk_finds_row_via_hot_index() {
6406 let mut cat = Catalog::new();
6407 cat.create_table(bigint_pk_users_schema()).unwrap();
6408 let t = cat.get_mut("users").unwrap();
6409 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6410 t.insert(make_user_row(id, name)).unwrap();
6411 }
6412 t.add_index("by_id".into(), "id").unwrap();
6413 let got = cat
6415 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6416 .unwrap();
6417 assert_eq!(got, make_user_row(2, "bob"));
6418 assert_eq!(cat.cold_segment_count(), 0);
6419 }
6420
6421 #[test]
6422 fn lookup_by_pk_returns_none_when_key_missing() {
6423 let mut cat = Catalog::new();
6424 cat.create_table(bigint_pk_users_schema()).unwrap();
6425 let t = cat.get_mut("users").unwrap();
6426 t.insert(make_user_row(1, "alice")).unwrap();
6427 t.add_index("by_id".into(), "id").unwrap();
6428 assert!(
6429 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6430 .is_none()
6431 );
6432 assert!(
6434 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6435 .is_none()
6436 );
6437 assert!(
6438 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6439 .is_none()
6440 );
6441 }
6442
6443 #[test]
6444 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6445 let mut cat = Catalog::new();
6449 cat.create_table(bigint_pk_users_schema()).unwrap();
6450 let t = cat.get_mut("users").unwrap();
6451 t.add_index("by_id".into(), "id").unwrap();
6452 let schema = t.schema.clone();
6453
6454 let cold_rows: Vec<(i64, &str)> =
6455 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6456 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6457 .iter()
6458 .map(|(id, name)| {
6459 let row = make_user_row(*id, name);
6460 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6461 })
6462 .collect();
6463 let (seg_bytes, _meta) =
6464 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6465 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6466 assert_eq!(seg_id, 0);
6467 assert_eq!(cat.cold_segment_count(), 1);
6468
6469 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6470 .iter()
6471 .map(|(id, _)| {
6472 (
6473 IndexKey::Int(*id),
6474 RowLocator::Cold {
6475 segment_id: seg_id,
6476 page_offset: 0,
6477 },
6478 )
6479 })
6480 .collect();
6481 let registered = cat
6482 .get_mut("users")
6483 .unwrap()
6484 .register_cold_locators("by_id", pairs)
6485 .unwrap();
6486 assert_eq!(registered, 4);
6487
6488 for (id, name) in &cold_rows {
6489 let got = cat
6490 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6491 .unwrap_or_else(|| panic!("cold key {id} not found"));
6492 assert_eq!(got, make_user_row(*id, name));
6493 }
6494 assert!(
6496 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6497 .is_none()
6498 );
6499 }
6500
6501 #[test]
6502 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6503 let mut cat = Catalog::new();
6507 cat.create_table(bigint_pk_users_schema()).unwrap();
6508 let t = cat.get_mut("users").unwrap();
6509 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6510 t.insert(make_user_row(id, name)).unwrap();
6511 }
6512 t.add_index("by_id".into(), "id").unwrap();
6513 let schema = t.schema.clone();
6514
6515 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6516 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6517 .iter()
6518 .map(|(id, name)| {
6519 let row = make_user_row(*id, name);
6520 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6521 })
6522 .collect();
6523 let (seg_bytes, _) =
6524 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6525 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6526 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6527 .iter()
6528 .map(|(id, _)| {
6529 (
6530 IndexKey::Int(*id),
6531 RowLocator::Cold {
6532 segment_id: seg_id,
6533 page_offset: 0,
6534 },
6535 )
6536 })
6537 .collect();
6538 cat.get_mut("users")
6539 .unwrap()
6540 .register_cold_locators("by_id", pairs)
6541 .unwrap();
6542
6543 assert_eq!(
6545 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6546 .unwrap(),
6547 make_user_row(1, "alice")
6548 );
6549 assert_eq!(
6550 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6551 .unwrap(),
6552 make_user_row(2, "bob")
6553 );
6554 assert_eq!(
6556 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6557 .unwrap(),
6558 make_user_row(100, "ivy")
6559 );
6560 assert_eq!(
6561 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6562 .unwrap(),
6563 make_user_row(200, "joe")
6564 );
6565 assert!(
6567 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6568 .is_none()
6569 );
6570 }
6571
6572 #[test]
6573 fn register_cold_locators_rejects_nsw_index() {
6574 let mut cat = Catalog::new();
6575 cat.create_table(TableSchema::new(
6576 "vecs",
6577 vec![
6578 ColumnSchema::new("id", DataType::Int, false),
6579 ColumnSchema::new(
6580 "v",
6581 DataType::Vector {
6582 dim: 4,
6583 encoding: VecEncoding::F32,
6584 },
6585 false,
6586 ),
6587 ],
6588 ))
6589 .unwrap();
6590 let t = cat.get_mut("vecs").unwrap();
6591 t.insert(Row::new(vec![
6592 Value::Int(1),
6593 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6594 ]))
6595 .unwrap();
6596 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6597 let err = t
6598 .register_cold_locators(
6599 "by_v",
6600 vec![(
6601 IndexKey::Int(1),
6602 RowLocator::Cold {
6603 segment_id: 0,
6604 page_offset: 0,
6605 },
6606 )],
6607 )
6608 .unwrap_err();
6609 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6612 }
6613
6614 #[test]
6615 fn load_segment_bytes_rejects_garbage() {
6616 let mut cat = Catalog::new();
6617 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6618 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6619 assert_eq!(cat.cold_segment_count(), 0);
6621 }
6622
6623 #[test]
6624 fn load_segment_bytes_returns_sequential_ids() {
6625 let mut cat = Catalog::new();
6626 cat.create_table(bigint_pk_users_schema()).unwrap();
6627 let schema = cat.get("users").unwrap().schema.clone();
6628 for batch in 0u32..3 {
6629 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6630 .map(|i| {
6631 let id = u64::from(batch) * 100 + i;
6632 let row = make_user_row(id.cast_signed(), "x");
6633 (id, encode_row_body_dense(&row, &schema))
6634 })
6635 .collect();
6636 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6637 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6638 }
6639 assert_eq!(cat.cold_segment_count(), 3);
6640 }
6641
6642 #[test]
6649 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6650 let mut cat = populated_users();
6657 cat.get_mut("users")
6658 .unwrap()
6659 .add_index("by_name".into(), "name")
6660 .unwrap();
6661
6662 let v8_bytes = encode_as_v8(&cat);
6667 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6668
6669 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6670 let idx = restored
6671 .get("users")
6672 .unwrap()
6673 .index_on(1)
6674 .expect("index_on(1) after restore");
6675 assert_eq!(
6678 idx.lookup_eq(&IndexKey::Text("alice".into())),
6679 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6680 );
6681 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6683 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6684 }
6685 }
6686
6687 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6692 let mut out = Vec::with_capacity(64);
6693 out.extend_from_slice(FILE_MAGIC);
6694 out.push(8u8);
6695 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6696 for t in &cat.tables {
6697 write_str(&mut out, &t.schema.name);
6698 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6699 for c in &t.schema.columns {
6700 write_str(&mut out, &c.name);
6701 write_data_type(&mut out, c.ty);
6702 out.push(u8::from(c.nullable));
6703 match &c.default {
6704 None => out.push(0),
6705 Some(v) => {
6706 out.push(1);
6707 write_value(&mut out, v);
6708 }
6709 }
6710 out.push(u8::from(c.auto_increment));
6711 }
6712 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6713 for row in &t.rows {
6714 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6715 }
6716 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6717 for idx in &t.indices {
6718 write_str(&mut out, &idx.name);
6719 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6720 match &idx.kind {
6721 IndexKind::BTree(_) => out.push(0),
6724 IndexKind::Nsw(g) => {
6725 out.push(1);
6726 write_u16(&mut out, u16::try_from(g.m).unwrap());
6727 write_nsw_graph(&mut out, g);
6728 }
6729 IndexKind::Brin { .. } => panic!(
6732 "v8 catalog writer cannot serialise BRIN — \
6733 tests with BRIN indices must use the current writer"
6734 ),
6735 }
6736 }
6737 }
6738 out
6739 }
6740
6741 #[test]
6747 fn v9_catalog_round_trip_preserves_cold_locators() {
6748 let mut cat = Catalog::new();
6749 cat.create_table(bigint_pk_users_schema()).unwrap();
6750 let t = cat.get_mut("users").unwrap();
6751 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6753 t.insert(make_user_row(id, name)).unwrap();
6754 }
6755 t.add_index("by_id".into(), "id").unwrap();
6756 let schema = t.schema.clone();
6757
6758 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
6760 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6761 .iter()
6762 .map(|(id, name)| {
6763 let row = make_user_row(*id, name);
6764 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6765 })
6766 .collect();
6767 let (seg_bytes, _) =
6768 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6769 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
6770 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6771 .iter()
6772 .map(|(id, _)| {
6773 (
6774 IndexKey::Int(*id),
6775 RowLocator::Cold {
6776 segment_id: seg_id,
6777 page_offset: 0,
6778 },
6779 )
6780 })
6781 .collect();
6782 cat.get_mut("users")
6783 .unwrap()
6784 .register_cold_locators("by_id", pairs)
6785 .unwrap();
6786
6787 let bytes = cat.serialize();
6789 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
6790 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
6791
6792 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
6799 assert_eq!(restored_seg_id, seg_id);
6800
6801 let idx = restored.get("users").unwrap().index_on(0).unwrap();
6802 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
6804 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6805 for (id, _) in &cold_rows {
6807 assert_eq!(
6808 idx.lookup_eq(&IndexKey::Int(*id)),
6809 &[RowLocator::Cold {
6810 segment_id: seg_id,
6811 page_offset: 0,
6812 }]
6813 );
6814 }
6815 assert_eq!(
6817 restored
6818 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6819 .unwrap(),
6820 make_user_row(2, "bob")
6821 );
6822 for (id, name) in &cold_rows {
6823 assert_eq!(
6824 restored
6825 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6826 .unwrap(),
6827 make_user_row(*id, name)
6828 );
6829 }
6830 }
6831
6832 #[test]
6839 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
6840 let schema = TableSchema::new(
6841 "wide",
6842 vec![
6843 ColumnSchema::new("a", DataType::SmallInt, true),
6844 ColumnSchema::new("b", DataType::Int, false),
6845 ColumnSchema::new("c", DataType::BigInt, false),
6846 ColumnSchema::new("d", DataType::Float, false),
6847 ColumnSchema::new("e", DataType::Bool, false),
6848 ColumnSchema::new("f", DataType::Text, false),
6849 ColumnSchema::new(
6850 "g",
6851 DataType::Vector {
6852 dim: 3,
6853 encoding: VecEncoding::F32,
6854 },
6855 false,
6856 ),
6857 ColumnSchema::new(
6858 "h",
6859 DataType::Numeric {
6860 precision: 18,
6861 scale: 2,
6862 },
6863 false,
6864 ),
6865 ColumnSchema::new("i", DataType::Date, false),
6866 ColumnSchema::new("j", DataType::Timestamp, false),
6867 ],
6868 );
6869 let cases: &[Row] = &[
6870 Row::new(vec![
6871 Value::SmallInt(7),
6872 Value::Int(42),
6873 Value::BigInt(1_000_000),
6874 Value::Float(1.5),
6875 Value::Bool(true),
6876 Value::Text("hello".into()),
6877 Value::Vector(vec![1.0, 2.0, 3.0]),
6878 Value::Numeric {
6879 scaled: 12345,
6880 scale: 2,
6881 },
6882 Value::Date(20_000),
6883 Value::Timestamp(1_700_000_000_000_000),
6884 ]),
6885 Row::new(vec![
6887 Value::Null,
6888 Value::Int(0),
6889 Value::BigInt(0),
6890 Value::Float(0.0),
6891 Value::Bool(false),
6892 Value::Text(String::new()),
6893 Value::Vector(vec![]),
6894 Value::Numeric {
6895 scaled: 0,
6896 scale: 2,
6897 },
6898 Value::Date(0),
6899 Value::Timestamp(0),
6900 ]),
6901 Row::new(vec![
6902 Value::SmallInt(-1),
6903 Value::Int(-1),
6904 Value::BigInt(-1),
6905 Value::Float(-0.5),
6906 Value::Bool(true),
6907 Value::Text("a much longer payload here".into()),
6908 Value::Vector(vec![0.1, 0.2, 0.3]),
6909 Value::Numeric {
6910 scaled: -999_999_999,
6911 scale: 2,
6912 },
6913 Value::Date(-1),
6914 Value::Timestamp(-1),
6915 ]),
6916 ];
6917 for row in cases {
6918 let actual = encode_row_body_dense(row, &schema).len();
6919 let fast = row_body_encoded_len(row, &schema);
6920 assert_eq!(actual, fast, "row {row:?}");
6921 }
6922 }
6923
6924 #[test]
6925 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
6926 let mut cat = Catalog::new();
6927 cat.create_table(bigint_pk_users_schema()).unwrap();
6928 let t = cat.get_mut("users").unwrap();
6929 assert_eq!(t.hot_bytes(), 0);
6930 let mut expected: u64 = 0;
6931 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6932 let row = make_user_row(id, name);
6933 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
6934 t.insert(row).unwrap();
6935 }
6936 assert_eq!(t.hot_bytes(), expected);
6937 assert_eq!(cat.hot_tier_bytes(), expected);
6938 }
6939
6940 #[test]
6941 fn hot_bytes_shrinks_on_delete() {
6942 let mut cat = Catalog::new();
6943 cat.create_table(bigint_pk_users_schema()).unwrap();
6944 let t = cat.get_mut("users").unwrap();
6945 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6946 t.insert(make_user_row(id, name)).unwrap();
6947 }
6948 let before = t.hot_bytes();
6949 let bob_row = make_user_row(2, "bob");
6951 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
6952 let removed = t.delete_rows(&[1]);
6953 assert_eq!(removed, 1);
6954 assert_eq!(t.hot_bytes(), before - bob_bytes);
6955 }
6956
6957 #[test]
6958 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
6959 let mut cat = Catalog::new();
6960 cat.create_table(bigint_pk_users_schema()).unwrap();
6961 let t = cat.get_mut("users").unwrap();
6962 t.insert(make_user_row(1, "alice")).unwrap();
6963 let after_insert = t.hot_bytes();
6964 let new_row = make_user_row(1, "alice-the-longer-name");
6967 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
6968 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
6969 t.update_row(0, new_row.values).unwrap();
6970 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
6971 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
6972 }
6973
6974 #[test]
6975 fn hot_bytes_round_trips_through_serialize_deserialize() {
6976 let mut cat = Catalog::new();
6977 cat.create_table(bigint_pk_users_schema()).unwrap();
6978 let t = cat.get_mut("users").unwrap();
6979 for i in 0..10 {
6980 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
6981 .unwrap();
6982 }
6983 let pre = cat.hot_tier_bytes();
6984 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6985 assert_eq!(restored.hot_tier_bytes(), pre);
6986 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
6987 }
6988
6989 #[test]
6996 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
6997 let mut cat = Catalog::new();
6998 cat.create_table(bigint_pk_users_schema()).unwrap();
6999 let t = cat.get_mut("users").unwrap();
7000 for id in 0..10i64 {
7001 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7002 .unwrap();
7003 }
7004 t.add_index("by_id".into(), "id").unwrap();
7005 let total_bytes_before = t.hot_bytes();
7006
7007 let report = cat
7008 .freeze_oldest_to_cold("users", "by_id", 6)
7009 .expect("freeze succeeds");
7010 assert_eq!(report.frozen_rows, 6);
7011 assert_eq!(report.segment_id, 0);
7012 assert!(report.bytes_freed > 0);
7013 assert!(!report.segment_bytes.is_empty());
7014
7015 let t = cat.get("users").unwrap();
7016 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
7017 assert_eq!(cat.cold_segment_count(), 1);
7018 assert_eq!(
7020 t.hot_bytes(),
7021 total_bytes_before - report.bytes_freed,
7022 "hot_bytes accounting matches FreezeReport"
7023 );
7024
7025 for id in 0..10i64 {
7028 let got = cat
7029 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7030 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
7031 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7032 }
7033 }
7034
7035 #[test]
7040 fn freeze_twice_preserves_prior_cold_locators() {
7041 let mut cat = Catalog::new();
7042 cat.create_table(bigint_pk_users_schema()).unwrap();
7043 let t = cat.get_mut("users").unwrap();
7044 for id in 0..12i64 {
7045 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7046 .unwrap();
7047 }
7048 t.add_index("by_id".into(), "id").unwrap();
7049
7050 cat.freeze_oldest_to_cold("users", "by_id", 4)
7051 .expect("first freeze ok");
7052 cat.freeze_oldest_to_cold("users", "by_id", 4)
7053 .expect("second freeze ok");
7054
7055 assert_eq!(cat.get("users").unwrap().row_count(), 4);
7056 assert_eq!(cat.cold_segment_count(), 2);
7057 for id in 0..12i64 {
7060 let got = cat
7061 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7062 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
7063 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7064 }
7065 }
7066
7067 #[test]
7070 fn freeze_oldest_to_cold_rejects_invalid_input() {
7071 let mut cat = Catalog::new();
7072 cat.create_table(bigint_pk_users_schema()).unwrap();
7073 let t = cat.get_mut("users").unwrap();
7074 for id in 0..3i64 {
7075 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7076 .unwrap();
7077 }
7078 t.add_index("by_id".into(), "id").unwrap();
7079
7080 assert!(matches!(
7082 cat.freeze_oldest_to_cold("users", "by_id", 0),
7083 Err(StorageError::Corrupt(_))
7084 ));
7085 assert!(matches!(
7087 cat.freeze_oldest_to_cold("missing", "by_id", 1),
7088 Err(StorageError::Corrupt(_))
7089 ));
7090 assert!(matches!(
7092 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
7093 Err(StorageError::Corrupt(_))
7094 ));
7095 assert!(matches!(
7097 cat.freeze_oldest_to_cold("users", "by_id", 999),
7098 Err(StorageError::Corrupt(_))
7099 ));
7100 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7102 assert_eq!(cat.cold_segment_count(), 0);
7103 }
7104
7105 #[test]
7108 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
7109 let mut cat = Catalog::new();
7110 cat.create_table(TableSchema::new(
7111 "by_name",
7112 vec![
7113 ColumnSchema::new("name", DataType::Text, false),
7114 ColumnSchema::new("payload", DataType::BigInt, false),
7115 ],
7116 ))
7117 .unwrap();
7118 let t = cat.get_mut("by_name").unwrap();
7119 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
7120 .unwrap();
7121 t.add_index("by_n".into(), "name").unwrap();
7122 let err = cat
7123 .freeze_oldest_to_cold("by_name", "by_n", 1)
7124 .expect_err("non-integer PK rejected");
7125 match err {
7126 StorageError::Corrupt(s) => assert!(
7127 s.contains("non-integer"),
7128 "error message names the constraint: {s}"
7129 ),
7130 other => panic!("expected Corrupt, got {other:?}"),
7131 }
7132 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7134 assert_eq!(cat.cold_segment_count(), 0);
7135 }
7136
7137 #[test]
7142 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7143 let mut cat = Catalog::new();
7144 cat.create_table(bigint_pk_users_schema()).unwrap();
7145 let t = cat.get_mut("users").unwrap();
7146 for id in 0..6i64 {
7147 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7148 .unwrap();
7149 }
7150 t.add_index("by_id".into(), "id").unwrap();
7151 t.add_index("by_name".into(), "name").unwrap();
7152
7153 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7154
7155 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7159 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7160 assert_eq!(got.len(), 1);
7161 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7162 match got[0] {
7163 RowLocator::Hot(i) => {
7164 assert_eq!(i, 1);
7167 }
7168 RowLocator::Cold { .. } => unreachable!(),
7169 }
7170 }
7171
7172 #[test]
7180 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7181 let mut cat = Catalog::new();
7182 cat.create_table(bigint_pk_users_schema()).unwrap();
7183 let t = cat.get_mut("users").unwrap();
7184 for id in 0..6i64 {
7185 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7186 .unwrap();
7187 }
7188 t.add_index("by_id".into(), "id").unwrap();
7189 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7192 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7193
7194 let new_idx = cat
7196 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7197 .expect("promote ok")
7198 .expect("PK 2 was cold");
7199 assert_eq!(
7200 new_idx, 2,
7201 "promoted row appended after the 2 surviving hot rows"
7202 );
7203
7204 let t = cat.get("users").unwrap();
7205 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7206 let row = make_user_row(2, "u-2");
7208 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7209 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7210
7211 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7214 assert_eq!(entries.len(), 1, "exactly one locator per key");
7215 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7216 assert_eq!(
7218 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7219 .unwrap(),
7220 row
7221 );
7222 assert_eq!(
7225 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7226 .unwrap(),
7227 make_user_row(0, "u-0")
7228 );
7229 }
7230
7231 #[test]
7235 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7236 let mut cat = Catalog::new();
7237 cat.create_table(bigint_pk_users_schema()).unwrap();
7238 let t = cat.get_mut("users").unwrap();
7239 t.insert(make_user_row(7, "alice")).unwrap();
7240 t.add_index("by_id".into(), "id").unwrap();
7241
7242 assert!(
7244 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7245 .unwrap()
7246 .is_none()
7247 );
7248 assert!(
7250 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7251 .unwrap()
7252 .is_none()
7253 );
7254 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7256 assert_eq!(cat.cold_segment_count(), 0);
7257 }
7258
7259 #[test]
7264 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7265 let mut cat = Catalog::new();
7266 cat.create_table(bigint_pk_users_schema()).unwrap();
7267 let t = cat.get_mut("users").unwrap();
7268 for id in 0..5i64 {
7269 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7270 .unwrap();
7271 }
7272 t.add_index("by_id".into(), "id").unwrap();
7273 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7274
7275 assert!(
7277 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7278 .is_some(),
7279 "frozen PK resolves before shadow"
7280 );
7281 let removed = cat
7282 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7283 .unwrap();
7284 assert_eq!(removed, 1, "exactly one cold locator retired");
7285
7286 assert!(
7289 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7290 .is_none(),
7291 "shadowed key no longer resolves"
7292 );
7293 assert_eq!(
7295 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7296 .unwrap(),
7297 make_user_row(0, "u-0")
7298 );
7299 assert_eq!(
7300 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7301 .unwrap(),
7302 make_user_row(2, "u-2")
7303 );
7304 }
7305
7306 #[test]
7311 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7312 let mut cat = Catalog::new();
7313 cat.create_table(bigint_pk_users_schema()).unwrap();
7314 let t = cat.get_mut("users").unwrap();
7315 t.insert(make_user_row(1, "alice")).unwrap();
7316 t.add_index("by_id".into(), "id").unwrap();
7317 assert_eq!(
7318 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7319 .unwrap(),
7320 0,
7321 "hot-only key drops no cold locators"
7322 );
7323 assert_eq!(
7324 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7325 .unwrap(),
7326 0,
7327 "absent key drops no cold locators"
7328 );
7329 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7330 }
7331
7332 #[test]
7334 fn promote_and_shadow_reject_invalid_inputs() {
7335 let mut cat = Catalog::new();
7336 cat.create_table(bigint_pk_users_schema()).unwrap();
7337 let t = cat.get_mut("users").unwrap();
7338 t.insert(make_user_row(1, "alice")).unwrap();
7339 t.add_index("by_id".into(), "id").unwrap();
7340
7341 assert!(matches!(
7343 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7344 Err(StorageError::Corrupt(_))
7345 ));
7346 assert!(matches!(
7347 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7348 Err(StorageError::Corrupt(_))
7349 ));
7350 assert!(matches!(
7352 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7353 Err(StorageError::Corrupt(_))
7354 ));
7355 assert!(matches!(
7356 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7357 Err(StorageError::Corrupt(_))
7358 ));
7359 }
7360
7361 #[test]
7368 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7369 let mut a = Catalog::new();
7370 let mut b = Catalog::new();
7371 for cat in [&mut a, &mut b] {
7372 cat.create_table(bigint_pk_users_schema()).unwrap();
7373 let t = cat.get_mut("users").unwrap();
7374 for id in 0..10i64 {
7375 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7376 .unwrap();
7377 }
7378 t.add_index("by_id".into(), "id").unwrap();
7379 }
7380 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7381 let slice = b
7382 .prepare_freeze_slice("users", "by_id", 0..6)
7383 .expect("prepare");
7384 let parallel = b
7385 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7386 .expect("commit");
7387 assert_eq!(single.segment_id, parallel.segment_id);
7388 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7389 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7390 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7391 for id in 0..10i64 {
7393 assert_eq!(
7394 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7395 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7396 "PK {id} differs after single vs slice freeze"
7397 );
7398 }
7399 }
7400
7401 #[test]
7406 fn commit_freeze_slices_two_slices_match_single_slice() {
7407 let mut a = Catalog::new();
7408 let mut b = Catalog::new();
7409 for cat in [&mut a, &mut b] {
7410 cat.create_table(bigint_pk_users_schema()).unwrap();
7411 let t = cat.get_mut("users").unwrap();
7412 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7415 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7416 .unwrap();
7417 }
7418 t.add_index("by_id".into(), "id").unwrap();
7419 }
7420 let single = a
7421 .prepare_freeze_slice("users", "by_id", 0..8)
7422 .expect("prepare");
7423 let one = a
7424 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7425 .expect("commit one");
7426 let s1 = b
7427 .prepare_freeze_slice("users", "by_id", 0..4)
7428 .expect("prepare s1");
7429 let s2 = b
7430 .prepare_freeze_slice("users", "by_id", 4..8)
7431 .expect("prepare s2");
7432 let two = b
7433 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7434 .expect("commit two");
7435 assert_eq!(one.segment_bytes, two.segment_bytes);
7436 assert_eq!(one.frozen_rows, two.frozen_rows);
7437 for id in 0..10i64 {
7440 assert_eq!(
7441 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7442 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7443 "PK {id} differs after one-slice vs two-slice freeze"
7444 );
7445 }
7446 }
7447
7448 #[test]
7450 fn commit_freeze_slices_rejects_gap() {
7451 let mut cat = Catalog::new();
7452 cat.create_table(bigint_pk_users_schema()).unwrap();
7453 let t = cat.get_mut("users").unwrap();
7454 for id in 0..6i64 {
7455 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7456 .unwrap();
7457 }
7458 t.add_index("by_id".into(), "id").unwrap();
7459 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7460 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7461 assert!(matches!(
7462 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7463 Err(StorageError::Corrupt(_))
7464 ));
7465 assert_eq!(cat.cold_segment_count(), 0);
7467 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7468 }
7469
7470 #[test]
7472 fn commit_freeze_slices_empty_is_noop() {
7473 let mut cat = Catalog::new();
7474 cat.create_table(bigint_pk_users_schema()).unwrap();
7475 let t = cat.get_mut("users").unwrap();
7476 for id in 0..3i64 {
7477 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7478 .unwrap();
7479 }
7480 t.add_index("by_id".into(), "id").unwrap();
7481 let report = cat
7482 .commit_freeze_slices("users", "by_id", Vec::new())
7483 .unwrap();
7484 assert_eq!(report.frozen_rows, 0);
7485 assert_eq!(cat.cold_segment_count(), 0);
7486 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7487 }
7488
7489 #[test]
7496 fn compact_merges_small_segments_storage_unit() {
7497 let mut cat = Catalog::new();
7498 cat.create_table(bigint_pk_users_schema()).unwrap();
7499 let t = cat.get_mut("users").unwrap();
7500 for id in 0..8i64 {
7501 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7502 .unwrap();
7503 }
7504 t.add_index("by_id".into(), "id").unwrap();
7505 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7507 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7508 assert_eq!(cat.cold_segment_count(), 2);
7509 assert_eq!(cat.cold_segment_slot_count(), 2);
7510
7511 let max_seg_bytes = cat
7514 .cold_segment_ids_global()
7515 .iter()
7516 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7517 .max()
7518 .unwrap();
7519 let target = max_seg_bytes + 1;
7520
7521 let report = cat
7522 .compact_cold_segments("users", "by_id", target)
7523 .expect("compact succeeds");
7524 assert_eq!(report.sources.len(), 2);
7525 let merged_id = report.merged_segment_id.expect("merge happened");
7526 assert_eq!(report.merged_rows, 6);
7527 assert_eq!(report.deleted_rows_pruned, 0);
7528 assert!(!report.merged_segment_bytes.is_empty());
7529
7530 assert_eq!(cat.cold_segment_count(), 1);
7533 assert_eq!(cat.cold_segment_slot_count(), 3);
7534 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7535
7536 for id in 0..8i64 {
7539 let got = cat
7540 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7541 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7542 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7543 }
7544 }
7545
7546 #[test]
7550 fn compact_drops_shadowed_cold_rows() {
7551 let mut cat = Catalog::new();
7552 cat.create_table(bigint_pk_users_schema()).unwrap();
7553 let t = cat.get_mut("users").unwrap();
7554 for id in 0..6i64 {
7555 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7556 .unwrap();
7557 }
7558 t.add_index("by_id".into(), "id").unwrap();
7559 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7560 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7561 assert_eq!(
7563 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7564 .unwrap(),
7565 1
7566 );
7567 assert_eq!(
7568 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7569 .unwrap(),
7570 1
7571 );
7572
7573 let max_seg_bytes = cat
7574 .cold_segment_ids_global()
7575 .iter()
7576 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7577 .max()
7578 .unwrap();
7579 let report = cat
7580 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7581 .expect("compact succeeds");
7582 assert_eq!(report.sources.len(), 2);
7583 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7584 assert_eq!(report.deleted_rows_pruned, 2);
7585
7586 for shadowed in [1i64, 4i64] {
7588 assert!(
7589 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7590 .is_none(),
7591 "shadowed PK {shadowed} must remain invisible after compact"
7592 );
7593 }
7594 for live in [0i64, 2, 3, 5] {
7596 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7597 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7598 }
7599 }
7600
7601 #[test]
7604 fn compact_is_noop_below_two_candidates() {
7605 let mut cat = Catalog::new();
7606 cat.create_table(bigint_pk_users_schema()).unwrap();
7607 let t = cat.get_mut("users").unwrap();
7608 for id in 0..6i64 {
7609 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7610 .unwrap();
7611 }
7612 t.add_index("by_id".into(), "id").unwrap();
7613 let report = cat
7615 .compact_cold_segments("users", "by_id", 1 << 30)
7616 .expect("noop ok");
7617 assert!(report.merged_segment_id.is_none());
7618 assert!(report.sources.is_empty());
7619
7620 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7622 let report = cat
7623 .compact_cold_segments("users", "by_id", 1 << 30)
7624 .expect("noop ok");
7625 assert!(report.merged_segment_id.is_none());
7626 assert_eq!(cat.cold_segment_count(), 1);
7627
7628 let report = cat
7631 .compact_cold_segments("users", "by_id", 1)
7632 .expect("noop ok");
7633 assert!(report.merged_segment_id.is_none());
7634 assert_eq!(cat.cold_segment_count(), 1);
7635 }
7636
7637 #[test]
7645 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7646 let mut cat = Catalog::new();
7647 cat.create_table(bigint_pk_users_schema()).unwrap();
7648 let t = cat.get_mut("users").unwrap();
7649 for id in 0..6i64 {
7650 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7651 .unwrap();
7652 }
7653 t.add_index("by_id".into(), "id").unwrap();
7654 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7655 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7656 let max_seg_bytes = cat
7657 .cold_segment_ids_global()
7658 .iter()
7659 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7660 .max()
7661 .unwrap();
7662 let report = cat
7663 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7664 .expect("compact ok");
7665 let merged_id = report.merged_segment_id.unwrap();
7666
7667 let cat_bytes = cat.serialize();
7672 let merged_bytes = report.merged_segment_bytes.clone();
7673
7674 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7675 restored
7676 .load_segment_bytes_at(merged_id, merged_bytes)
7677 .expect("reload merged ok");
7678
7679 for id in 0..6i64 {
7681 let got = restored
7682 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7683 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7684 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7685 }
7686 assert_eq!(restored.cold_segment_count(), 1);
7689 }
7690
7691 #[test]
7694 fn load_segment_bytes_at_pads_and_rejects_collision() {
7695 let mut cat = Catalog::new();
7696 cat.create_table(bigint_pk_users_schema()).unwrap();
7697 let t = cat.get_mut("users").unwrap();
7698 for id in 0..4i64 {
7699 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7700 .unwrap();
7701 }
7702 t.add_index("by_id".into(), "id").unwrap();
7703 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7704 let bytes_seg0 = report.segment_bytes.clone();
7705
7706 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7710 .expect("pad + load ok");
7711 assert_eq!(cat.cold_segment_slot_count(), 6);
7712 assert_eq!(cat.cold_segment_count(), 2);
7713
7714 assert!(matches!(
7716 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7717 Err(StorageError::Corrupt(_))
7718 ));
7719 assert!(matches!(
7721 cat.load_segment_bytes_at(0, bytes_seg0),
7722 Err(StorageError::Corrupt(_))
7723 ));
7724 }
7725
7726 #[test]
7730 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7731 let mut cat = Catalog::new();
7732 cat.create_table(bigint_pk_users_schema()).unwrap();
7733 let t = cat.get_mut("users").unwrap();
7734 for id in 0..4i64 {
7735 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7736 .unwrap();
7737 }
7738 t.add_index("by_id".into(), "id").unwrap();
7739
7740 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7742 let promoted = cat
7743 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7744 .unwrap();
7745 assert!(promoted.is_some());
7746 let entries_after_promote = cat
7747 .get("users")
7748 .unwrap()
7749 .index_on(0)
7750 .unwrap()
7751 .lookup_eq(&IndexKey::Int(0))
7752 .to_vec();
7753 assert_eq!(entries_after_promote.len(), 1);
7754 assert!(entries_after_promote[0].is_hot());
7755
7756 for id in [2i64, 3] {
7763 assert_eq!(
7764 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7765 .unwrap(),
7766 make_user_row(id, &alloc::format!("u-{id}"))
7767 );
7768 }
7769 }
7770}