1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138}
139
140impl fmt::Display for DataType {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 Self::SmallInt => f.write_str("SMALLINT"),
144 Self::Int => f.write_str("INT"),
145 Self::BigInt => f.write_str("BIGINT"),
146 Self::Float => f.write_str("FLOAT"),
147 Self::Text => f.write_str("TEXT"),
148 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
149 Self::Char(n) => write!(f, "CHAR({n})"),
150 Self::Bool => f.write_str("BOOL"),
151 Self::Vector { dim, encoding } => match encoding {
152 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
153 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
154 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
155 },
156 Self::Numeric { precision, scale } => {
157 if *scale == 0 {
158 write!(f, "NUMERIC({precision})")
159 } else {
160 write!(f, "NUMERIC({precision}, {scale})")
161 }
162 }
163 Self::Date => f.write_str("DATE"),
164 Self::Timestamp => f.write_str("TIMESTAMP"),
165 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
166 Self::Interval => f.write_str("INTERVAL"),
167 Self::Json => f.write_str("JSON"),
168 Self::Jsonb => f.write_str("JSONB"),
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq)]
177#[non_exhaustive]
178pub enum Value {
179 SmallInt(i16),
180 Int(i32),
181 BigInt(i64),
182 Float(f64),
183 Text(String),
184 Bool(bool),
185 Vector(Vec<f32>),
186 Sq8Vector(crate::quantize::Sq8Vector),
193 HalfVector(crate::halfvec::HalfVector),
199 Numeric {
203 scaled: i128,
204 scale: u8,
205 },
206 Date(i32),
208 Timestamp(i64),
210 Interval {
213 months: i32,
214 micros: i64,
215 },
216 Json(String),
220 Null,
221}
222
223impl Value {
224 pub fn data_type(&self) -> Option<DataType> {
226 match self {
227 Self::SmallInt(_) => Some(DataType::SmallInt),
228 Self::Int(_) => Some(DataType::Int),
229 Self::BigInt(_) => Some(DataType::BigInt),
230 Self::Float(_) => Some(DataType::Float),
231 Self::Text(_) => Some(DataType::Text),
234 Self::Bool(_) => Some(DataType::Bool),
235 Self::Vector(v) => Some(DataType::Vector {
236 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
237 encoding: VecEncoding::F32,
238 }),
239 Self::Sq8Vector(q) => Some(DataType::Vector {
240 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
241 encoding: VecEncoding::Sq8,
242 }),
243 Self::HalfVector(h) => Some(DataType::Vector {
244 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
245 encoding: VecEncoding::F16,
246 }),
247 Self::Numeric { scale, .. } => Some(DataType::Numeric {
252 precision: 0,
253 scale: *scale,
254 }),
255 Self::Date(_) => Some(DataType::Date),
256 Self::Timestamp(_) => Some(DataType::Timestamp),
257 Self::Interval { .. } => Some(DataType::Interval),
258 Self::Json(_) => Some(DataType::Json),
259 Self::Null => None,
260 }
261 }
262
263 pub const fn is_null(&self) -> bool {
264 matches!(self, Self::Null)
265 }
266}
267
268#[derive(Debug, Clone, PartialEq)]
271pub struct Row {
272 pub values: Vec<Value>,
273}
274
275impl Row {
276 pub const fn new(values: Vec<Value>) -> Self {
277 Self { values }
278 }
279
280 pub fn len(&self) -> usize {
281 self.values.len()
282 }
283
284 pub fn is_empty(&self) -> bool {
285 self.values.is_empty()
286 }
287}
288
289#[derive(Debug, Clone, PartialEq)]
290pub struct ColumnSchema {
291 pub name: String,
292 pub ty: DataType,
293 pub nullable: bool,
294 pub default: Option<Value>,
299 pub runtime_default: Option<String>,
307 pub auto_increment: bool,
311}
312
313#[derive(Debug, Clone, PartialEq)]
314pub struct TableSchema {
315 pub name: String,
316 pub columns: Vec<ColumnSchema>,
317 pub hot_tier_bytes: Option<u64>,
323 pub foreign_keys: Vec<ForeignKeyConstraint>,
330 pub uniqueness_constraints: Vec<UniquenessConstraint>,
337}
338
339#[derive(Debug, Clone, PartialEq, Eq)]
344pub struct UniquenessConstraint {
345 pub is_primary_key: bool,
350 pub columns: Vec<usize>,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
361pub struct ForeignKeyConstraint {
362 pub name: Option<String>,
366 pub local_columns: Vec<usize>,
369 pub parent_table: String,
371 pub parent_columns: Vec<usize>,
376 pub on_delete: FkAction,
378 pub on_update: FkAction,
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum FkAction {
386 Restrict,
387 Cascade,
388 SetNull,
389 SetDefault,
390 NoAction,
391}
392
393impl FkAction {
394 pub const fn tag(self) -> u8 {
396 match self {
397 Self::Restrict => 0,
398 Self::Cascade => 1,
399 Self::SetNull => 2,
400 Self::SetDefault => 3,
401 Self::NoAction => 4,
402 }
403 }
404 pub const fn from_tag(b: u8) -> Option<Self> {
405 Some(match b {
406 0 => Self::Restrict,
407 1 => Self::Cascade,
408 2 => Self::SetNull,
409 3 => Self::SetDefault,
410 4 => Self::NoAction,
411 _ => return None,
412 })
413 }
414}
415
416impl TableSchema {
417 pub fn column_position(&self, name: &str) -> Option<usize> {
418 self.columns.iter().position(|c| c.name == name)
419 }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
427pub enum IndexKey {
428 Int(i64),
429 Text(String),
430 Bool(bool),
431}
432
433impl IndexKey {
434 pub fn from_value(v: &Value) -> Option<Self> {
435 match v {
436 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
437 Value::Int(n) => Some(Self::Int(i64::from(*n))),
438 Value::BigInt(n) => Some(Self::Int(*n)),
439 Value::Text(s) => Some(Self::Text(s.clone())),
440 Value::Bool(b) => Some(Self::Bool(*b)),
441 Value::Date(d) => Some(Self::Int(i64::from(*d))),
444 Value::Timestamp(t) => Some(Self::Int(*t)),
445 Value::Null
450 | Value::Float(_)
451 | Value::Vector(_)
452 | Value::Sq8Vector(_)
453 | Value::HalfVector(_)
454 | Value::Numeric { .. }
455 | Value::Interval { .. }
456 | Value::Json(_) => None,
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
466pub struct Index {
467 pub name: String,
468 pub column_position: usize,
469 pub kind: IndexKind,
470 pub included_columns: Vec<usize>,
480 pub partial_predicate: Option<String>,
487 pub expression: Option<String>,
492 pub is_unique: bool,
499 pub extra_column_positions: Vec<usize>,
508}
509
510pub const NSW_DEFAULT_M: usize = 16;
513
514#[derive(Debug, Clone)]
522pub struct FreezeReport {
523 pub segment_id: u32,
526 pub frozen_rows: usize,
529 pub bytes_freed: u64,
533 pub segment_bytes: Vec<u8>,
538}
539
540#[derive(Debug, Clone)]
549pub struct FreezeSlice {
550 pub row_range: core::ops::Range<usize>,
555 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
561}
562
563#[derive(Debug, Clone)]
579pub struct CompactReport {
580 pub sources: Vec<u32>,
582 pub merged_segment_id: Option<u32>,
584 pub merged_segment_bytes: Vec<u8>,
586 pub merged_rows: usize,
588 pub deleted_rows_pruned: usize,
593 pub bytes_reclaimed_estimate: u64,
597}
598
599#[derive(Debug, Clone)]
600pub enum IndexKind {
601 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
618 Nsw(NswGraph),
620 Brin {
627 column_type: DataType,
631 },
632}
633
634#[derive(Debug, Clone)]
643pub struct NswGraph {
644 pub m: usize,
646 pub m_max_0: usize,
649 pub entry: Option<usize>,
652 pub entry_level: u8,
654 pub levels: PersistentVec<u8>,
661 pub layers: Vec<PersistentVec<Vec<u32>>>,
677}
678
679impl NswGraph {
680 fn new(m: usize) -> Self {
681 Self {
682 m,
683 m_max_0: m.saturating_mul(2),
684 entry: None,
685 entry_level: 0,
686 levels: PersistentVec::new(),
687 layers: alloc::vec![PersistentVec::new()],
688 }
689 }
690
691 pub const fn cap_for_layer(&self, layer: u8) -> usize {
693 if layer == 0 { self.m_max_0 } else { self.m }
694 }
695}
696
697#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
704 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
707 x ^= x >> 30;
708 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
709 x ^= x >> 27;
710 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
711 x ^= x >> 31;
712 let mut level: u8 = 0;
717 while x & 0xF == 0 && level < MAX_LEVEL {
718 level += 1;
719 x >>= 4;
720 }
721 level
722}
723
724impl Index {
725 fn new_btree(name: String, column_position: usize) -> Self {
726 Self {
727 name,
728 column_position,
729 kind: IndexKind::BTree(PersistentBTreeMap::new()),
730 included_columns: Vec::new(),
731 partial_predicate: None,
732 expression: None,
733 is_unique: false,
734 extra_column_positions: Vec::new(),
735 }
736 }
737
738 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
739 Self {
740 name,
741 column_position,
742 kind: IndexKind::Nsw(NswGraph::new(m)),
743 included_columns: Vec::new(),
744 partial_predicate: None,
745 expression: None,
746 is_unique: false,
747 extra_column_positions: Vec::new(),
748 }
749 }
750
751 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
755 Self {
756 name,
757 column_position,
758 kind: IndexKind::Brin { column_type },
759 included_columns: Vec::new(),
760 partial_predicate: None,
761 expression: None,
762 is_unique: false,
763 extra_column_positions: Vec::new(),
764 }
765 }
766
767 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
776 match &self.kind {
777 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
778 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
780 }
781 }
782
783 pub const fn nsw(&self) -> Option<&NswGraph> {
786 match &self.kind {
787 IndexKind::Nsw(g) => Some(g),
788 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
789 }
790 }
791
792 pub const fn is_brin(&self) -> bool {
797 matches!(self.kind, IndexKind::Brin { .. })
798 }
799}
800
801#[derive(Debug, Clone)]
817pub struct Table {
818 schema: TableSchema,
819 rows: PersistentVec<Row>,
820 indices: Vec<Index>,
821 hot_bytes: u64,
822 cold_row_count: u64,
836 cold_row_count_stale: bool,
841}
842
843impl Table {
844 pub fn new(schema: TableSchema) -> Self {
845 Self {
846 schema,
847 rows: PersistentVec::new(),
848 indices: Vec::new(),
849 hot_bytes: 0,
850 cold_row_count: 0,
851 cold_row_count_stale: false,
852 }
853 }
854
855 #[must_use]
859 pub const fn hot_bytes(&self) -> u64 {
860 self.hot_bytes
861 }
862
863 #[must_use]
866 pub const fn cold_row_count(&self) -> u64 {
867 self.cold_row_count
868 }
869
870 pub fn set_cold_row_count(&mut self, n: u64) {
873 self.cold_row_count = n;
874 self.cold_row_count_stale = false;
875 }
876
877 pub fn mark_cold_row_count_stale(&mut self) {
882 self.cold_row_count_stale = true;
883 }
884
885 #[must_use]
889 pub const fn cold_row_count_stale(&self) -> bool {
890 self.cold_row_count_stale
891 }
892
893 #[must_use]
904 pub fn count_cold_locators(&self) -> u64 {
905 let mut best: u64 = 0;
906 for idx in &self.indices {
907 if let IndexKind::BTree(map) = &idx.kind {
908 let n: u64 = map
909 .iter()
910 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
911 .sum();
912 if n > best {
913 best = n;
914 }
915 }
916 }
917 best
918 }
919
920 pub const fn schema(&self) -> &TableSchema {
921 &self.schema
922 }
923
924 pub const fn schema_mut(&mut self) -> &mut TableSchema {
928 &mut self.schema
929 }
930
931 pub const fn rows(&self) -> &PersistentVec<Row> {
935 &self.rows
936 }
937
938 pub const fn row_count(&self) -> usize {
939 self.rows.len()
940 }
941
942 pub fn indices_mut(&mut self) -> &mut [Index] {
947 &mut self.indices
948 }
949
950 pub fn indices(&self) -> &[Index] {
951 &self.indices
952 }
953
954 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
960 let ty = self.schema.columns.get(col_pos)?.ty;
961 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
962 return None;
963 }
964 let mut max: Option<i64> = None;
965 for row in &self.rows {
966 match row.values.get(col_pos) {
967 Some(Value::SmallInt(n)) => {
968 let v = i64::from(*n);
969 max = Some(max.map_or(v, |m| m.max(v)));
970 }
971 Some(Value::Int(n)) => {
972 let v = i64::from(*n);
973 max = Some(max.map_or(v, |m| m.max(v)));
974 }
975 Some(Value::BigInt(n)) => {
976 max = Some(max.map_or(*n, |m| m.max(*n)));
977 }
978 _ => {}
979 }
980 }
981 Some(max.map_or(1, |m| m + 1))
982 }
983
984 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
988 self.indices
995 .iter()
996 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
997 .or_else(|| {
998 self.indices
999 .iter()
1000 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
1001 })
1002 }
1003
1004 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1008 if row.len() != self.schema.columns.len() {
1009 return Err(StorageError::ArityMismatch {
1010 expected: self.schema.columns.len(),
1011 actual: row.len(),
1012 });
1013 }
1014 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1015 if val.is_null() {
1016 if !col.nullable {
1017 return Err(StorageError::NullInNotNull {
1018 column: col.name.clone(),
1019 });
1020 }
1021 continue;
1022 }
1023 let actual = val.data_type().expect("non-null");
1024 let compatible = actual == col.ty
1038 || matches!(
1039 (actual, col.ty),
1040 (
1041 DataType::Text,
1042 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1043 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1044 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1045 | (DataType::Timestamp, DataType::Timestamptz)
1046 | (DataType::Timestamptz, DataType::Timestamp)
1047 )
1048 || matches!(
1049 (actual, col.ty),
1050 (
1051 DataType::Numeric { scale: a, .. },
1052 DataType::Numeric { scale: b, .. },
1053 ) if a == b
1054 );
1055 if !compatible {
1056 return Err(StorageError::TypeMismatch {
1057 column: col.name.clone(),
1058 expected: col.ty,
1059 actual,
1060 position: i,
1061 });
1062 }
1063 }
1064 let new_row_idx = self.rows.len();
1065 for idx in &mut self.indices {
1069 if let IndexKind::BTree(map) = &mut idx.kind
1070 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1071 {
1072 let mut entries = map.get(&key).cloned().unwrap_or_default();
1078 entries.push(RowLocator::Hot(new_row_idx));
1079 map.insert_mut(key, entries);
1080 }
1081 }
1082 self.hot_bytes = self
1085 .hot_bytes
1086 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1087 self.rows.push_mut(row);
1092 let new_row_idx = self.rows.len() - 1;
1095 let nsw_targets: Vec<usize> = self
1096 .indices
1097 .iter()
1098 .enumerate()
1099 .filter_map(|(i, idx)| {
1100 if matches!(idx.kind, IndexKind::Nsw(_)) {
1101 Some(i)
1102 } else {
1103 None
1104 }
1105 })
1106 .collect();
1107 for idx_pos in nsw_targets {
1108 nsw_insert_at(self, idx_pos, new_row_idx);
1109 }
1110 Ok(())
1111 }
1112
1113 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1117 if self.indices.iter().any(|i| i.name == name) {
1118 return Err(StorageError::DuplicateIndex { name });
1119 }
1120 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1121 StorageError::ColumnNotFound {
1122 column: column_name.into(),
1123 }
1124 })?;
1125 let mut idx = Index::new_btree(name, column_position);
1126 if let IndexKind::BTree(map) = &mut idx.kind {
1127 for (i, row) in self.rows.iter().enumerate() {
1128 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1129 let mut entries = map.get(&key).cloned().unwrap_or_default();
1130 entries.push(RowLocator::Hot(i));
1131 map.insert_mut(key, entries);
1132 }
1133 }
1134 }
1135 self.indices.push(idx);
1136 Ok(())
1137 }
1138
1139 pub fn add_nsw_index(
1144 &mut self,
1145 name: String,
1146 column_name: &str,
1147 m: usize,
1148 ) -> Result<(), StorageError> {
1149 self.add_nsw_index_inner(name, column_name, m, None)
1150 }
1151
1152 pub fn rebuild_nsw_index(
1164 &mut self,
1165 name: &str,
1166 new_encoding: Option<VecEncoding>,
1167 ) -> Result<(), StorageError> {
1168 let idx_pos = self
1169 .indices
1170 .iter()
1171 .position(|i| i.name == name)
1172 .ok_or_else(|| StorageError::IndexNotFound {
1173 name: String::from(name),
1174 })?;
1175 let col_pos = self.indices[idx_pos].column_position;
1176 let m = match &self.indices[idx_pos].kind {
1177 IndexKind::Nsw(g) => g.m,
1178 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1179 return Err(StorageError::Unsupported(format!(
1180 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1181 )));
1182 }
1183 };
1184 let col_name = self.schema.columns[col_pos].name.clone();
1185 if let Some(target) = new_encoding {
1188 let current = match self.schema.columns[col_pos].ty {
1189 DataType::Vector { encoding, .. } => encoding,
1190 ref other => {
1191 return Err(StorageError::Unsupported(format!(
1192 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1193 )));
1194 }
1195 };
1196 if target != current {
1197 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1198 unreachable!("checked above")
1199 };
1200 let n = self.rows.len();
1201 for i in 0..n {
1202 let row = self
1203 .rows
1204 .get_mut(i)
1205 .expect("row index in bounds (we iterated up to len())");
1206 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1207 let recoded = recode_vector_cell(cell, target)?;
1208 row.values[col_pos] = recoded;
1209 }
1210 self.schema.columns[col_pos].ty = DataType::Vector {
1211 dim,
1212 encoding: target,
1213 };
1214 }
1215 }
1216 self.indices.remove(idx_pos);
1218 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1219 Ok(())
1220 }
1221
1222 pub fn restore_nsw_index(
1227 &mut self,
1228 name: String,
1229 column_name: &str,
1230 graph: NswGraph,
1231 ) -> Result<(), StorageError> {
1232 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1233 }
1234
1235 pub fn restore_btree_index(
1242 &mut self,
1243 name: String,
1244 column_name: &str,
1245 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1246 ) -> Result<(), StorageError> {
1247 if self.indices.iter().any(|i| i.name == name) {
1248 return Err(StorageError::DuplicateIndex { name });
1249 }
1250 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1251 StorageError::ColumnNotFound {
1252 column: column_name.into(),
1253 }
1254 })?;
1255 self.indices.push(Index {
1256 name,
1257 column_position,
1258 kind: IndexKind::BTree(map),
1259 included_columns: Vec::new(),
1260 partial_predicate: None,
1261 expression: None,
1262 is_unique: false,
1263 extra_column_positions: Vec::new(),
1264 });
1265 Ok(())
1266 }
1267
1268 pub fn restore_brin_index(
1273 &mut self,
1274 name: String,
1275 column_name: &str,
1276 column_type: DataType,
1277 ) -> Result<(), StorageError> {
1278 if self.indices.iter().any(|i| i.name == name) {
1279 return Err(StorageError::DuplicateIndex { name });
1280 }
1281 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1282 StorageError::ColumnNotFound {
1283 column: column_name.into(),
1284 }
1285 })?;
1286 self.indices.push(Index::new_brin(name, column_position, column_type));
1287 Ok(())
1288 }
1289
1290 pub fn add_brin_index(
1294 &mut self,
1295 name: String,
1296 column_name: &str,
1297 ) -> Result<(), StorageError> {
1298 if self.indices.iter().any(|i| i.name == name) {
1299 return Err(StorageError::DuplicateIndex { name });
1300 }
1301 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1302 StorageError::ColumnNotFound {
1303 column: column_name.into(),
1304 }
1305 })?;
1306 let column_type = self.schema.columns[column_position].ty;
1307 self.indices.push(Index::new_brin(name, column_position, column_type));
1308 Ok(())
1309 }
1310
1311 pub fn register_cold_locators<I>(
1328 &mut self,
1329 index_name: &str,
1330 locators: I,
1331 ) -> Result<usize, StorageError>
1332 where
1333 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1334 {
1335 let idx = self
1336 .indices
1337 .iter_mut()
1338 .find(|i| i.name == index_name)
1339 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1340 let map = match &mut idx.kind {
1341 IndexKind::BTree(map) => map,
1342 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1343 return Err(StorageError::Corrupt(format!(
1344 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1345 )));
1346 }
1347 };
1348 let mut count = 0usize;
1349 for (key, locator) in locators {
1350 let mut entries = map.get(&key).cloned().unwrap_or_default();
1351 entries.push(locator);
1352 map.insert_mut(key, entries);
1353 count += 1;
1354 }
1355 Ok(count)
1356 }
1357
1358 pub fn remove_cold_locators_for_key(
1368 &mut self,
1369 index_name: &str,
1370 key: &IndexKey,
1371 ) -> Result<usize, StorageError> {
1372 let idx = self
1373 .indices
1374 .iter_mut()
1375 .find(|i| i.name == index_name)
1376 .ok_or_else(|| {
1377 StorageError::Corrupt(format!(
1378 "remove_cold_locators_for_key: index {index_name:?} not found"
1379 ))
1380 })?;
1381 let map = match &mut idx.kind {
1382 IndexKind::BTree(map) => map,
1383 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1384 return Err(StorageError::Corrupt(format!(
1385 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1386 cold locators apply only to BTree indices"
1387 )));
1388 }
1389 };
1390 let Some(entries) = map.get(key) else {
1391 return Ok(0);
1392 };
1393 let mut kept: Vec<RowLocator> =
1394 entries.iter().copied().filter(RowLocator::is_hot).collect();
1395 let removed = entries.len() - kept.len();
1396 if removed == 0 {
1397 return Ok(0);
1398 }
1399 kept.shrink_to_fit();
1400 map.insert_mut(key.clone(), kept);
1408 Ok(removed)
1409 }
1410
1411 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1417 if positions.is_empty() {
1418 return 0;
1419 }
1420 let mut to_remove = alloc::vec![false; self.rows.len()];
1424 let mut removed = 0;
1425 for &p in positions {
1426 if p < to_remove.len() && !to_remove[p] {
1427 to_remove[p] = true;
1428 removed += 1;
1429 }
1430 }
1431 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1432 let mut removed_bytes: u64 = 0;
1433 for (i, row) in self.rows.iter().enumerate() {
1434 if to_remove[i] {
1435 removed_bytes =
1436 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1437 } else {
1438 new_rows.push_mut(row.clone());
1439 }
1440 }
1441 self.rows = new_rows;
1442 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1443 self.rebuild_indices();
1444 removed
1445 }
1446
1447 pub fn update_row(
1453 &mut self,
1454 position: usize,
1455 new_values: Vec<Value>,
1456 ) -> Result<(), StorageError> {
1457 if position >= self.rows.len() {
1458 return Err(StorageError::Corrupt(alloc::format!(
1459 "update_row: position {position} out of bounds (rows={})",
1460 self.rows.len()
1461 )));
1462 }
1463 if new_values.len() != self.schema.columns.len() {
1464 return Err(StorageError::ArityMismatch {
1465 expected: self.schema.columns.len(),
1466 actual: new_values.len(),
1467 });
1468 }
1469 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1473 if val.is_null() {
1474 if !col.nullable {
1475 return Err(StorageError::NullInNotNull {
1476 column: col.name.clone(),
1477 });
1478 }
1479 continue;
1480 }
1481 let actual = val.data_type().expect("non-null");
1482 let compatible = actual == col.ty
1483 || matches!(
1484 (actual, col.ty),
1485 (
1486 DataType::Text,
1487 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1488 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1489 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1490 | (DataType::Timestamp, DataType::Timestamptz)
1491 | (DataType::Timestamptz, DataType::Timestamp)
1492 )
1493 || matches!(
1494 (actual, col.ty),
1495 (
1496 DataType::Numeric { scale: a, .. },
1497 DataType::Numeric { scale: b, .. },
1498 ) if a == b
1499 );
1500 if !compatible {
1501 return Err(StorageError::TypeMismatch {
1502 column: col.name.clone(),
1503 expected: col.ty,
1504 actual,
1505 position: i,
1506 });
1507 }
1508 }
1509 let old_row = self
1510 .rows
1511 .get(position)
1512 .expect("position bounds-checked above");
1513 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1514 let new_row = Row::new(new_values);
1515 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1516 self.rows = self
1517 .rows
1518 .set(position, new_row)
1519 .expect("position bounds-checked above");
1520 self.hot_bytes = self
1521 .hot_bytes
1522 .saturating_sub(old_bytes)
1523 .saturating_add(new_bytes);
1524 self.rebuild_indices();
1525 Ok(())
1526 }
1527
1528 fn rebuild_indices(&mut self) {
1535 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1544 .indices
1545 .iter()
1546 .filter_map(|idx| match &idx.kind {
1547 IndexKind::BTree(map) => {
1548 let cold: Vec<(IndexKey, RowLocator)> = map
1549 .iter()
1550 .flat_map(|(k, locs)| {
1551 locs.iter()
1552 .filter(|l| l.is_cold())
1553 .copied()
1554 .map(move |l| (k.clone(), l))
1555 })
1556 .collect();
1557 if cold.is_empty() {
1558 None
1559 } else {
1560 Some((idx.name.clone(), cold))
1561 }
1562 }
1563 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1565 })
1566 .collect();
1567
1568 #[derive(Clone)]
1573 enum RebuildKind {
1574 BTree,
1575 Nsw(usize),
1576 Brin(DataType),
1577 }
1578 let descriptors: Vec<(String, usize, RebuildKind)> = self
1579 .indices
1580 .iter()
1581 .map(|idx| {
1582 let kind = match &idx.kind {
1583 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1584 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1585 IndexKind::BTree(_) => RebuildKind::BTree,
1586 };
1587 (idx.name.clone(), idx.column_position, kind)
1588 })
1589 .collect();
1590 self.indices.clear();
1591 for (name, column_position, rebuild_kind) in descriptors {
1592 match rebuild_kind {
1593 RebuildKind::Nsw(m) => {
1594 let idx = Index::new_nsw(name, column_position, m);
1595 self.indices.push(idx);
1596 let idx_pos = self.indices.len() - 1;
1597 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1598 for row_idx in row_indices {
1599 nsw_insert_at(self, idx_pos, row_idx);
1600 }
1601 }
1602 RebuildKind::Brin(column_type) => {
1603 self.indices.push(Index::new_brin(name, column_position, column_type));
1606 }
1607 RebuildKind::BTree => {
1608 let mut idx = Index::new_btree(name, column_position);
1609 if let IndexKind::BTree(map) = &mut idx.kind {
1610 for (i, row) in self.rows.iter().enumerate() {
1611 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1612 let mut entries = map.get(&key).cloned().unwrap_or_default();
1613 entries.push(RowLocator::Hot(i));
1614 map.insert_mut(key, entries);
1615 }
1616 }
1617 }
1618 self.indices.push(idx);
1619 }
1620 }
1621 }
1622
1623 for (idx_name, locators) in preserved_cold {
1628 let _ = self.register_cold_locators(&idx_name, locators);
1632 }
1633 }
1634
1635 fn add_nsw_index_inner(
1636 &mut self,
1637 name: String,
1638 column_name: &str,
1639 m: usize,
1640 restore: Option<NswGraph>,
1641 ) -> Result<(), StorageError> {
1642 if self.indices.iter().any(|i| i.name == name) {
1643 return Err(StorageError::DuplicateIndex { name });
1644 }
1645 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1646 StorageError::ColumnNotFound {
1647 column: column_name.into(),
1648 }
1649 })?;
1650 if !matches!(
1651 self.schema.columns[column_position].ty,
1652 DataType::Vector { .. }
1653 ) {
1654 return Err(StorageError::TypeMismatch {
1655 column: column_name.into(),
1656 expected: DataType::Vector {
1657 dim: 0,
1658 encoding: VecEncoding::F32,
1659 },
1660 actual: self.schema.columns[column_position].ty,
1661 position: column_position,
1662 });
1663 }
1664 if let Some(graph) = restore {
1665 self.indices.push(Index {
1666 name,
1667 column_position,
1668 kind: IndexKind::Nsw(graph),
1669 included_columns: Vec::new(),
1670 partial_predicate: None,
1671 expression: None,
1672 is_unique: false,
1673 extra_column_positions: Vec::new(),
1674 });
1675 return Ok(());
1676 }
1677 let idx = Index::new_nsw(name, column_position, m);
1678 self.indices.push(idx);
1679 let idx_pos = self.indices.len() - 1;
1680 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1683 for row_idx in row_indices {
1684 nsw_insert_at(self, idx_pos, row_idx);
1685 }
1686 Ok(())
1687 }
1688}
1689
1690fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1697 if matches!(cell, Value::Null) {
1698 return Ok(cell);
1699 }
1700 let as_f32: Vec<f32> = match &cell {
1702 Value::Vector(v) => v.clone(),
1703 Value::Sq8Vector(q) => quantize::dequantize(q),
1704 Value::HalfVector(h) => h.to_f32_vec(),
1705 other => {
1706 return Err(StorageError::Unsupported(format!(
1707 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1708 other.data_type()
1709 )));
1710 }
1711 };
1712 Ok(match target {
1717 VecEncoding::F32 => Value::Vector(as_f32),
1718 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1719 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1720 })
1721}
1722
1723fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1730 let col_pos = table.indices[idx_pos].column_position;
1731 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1732 Value::Vector(v) => Some(v.len()),
1733 Value::Sq8Vector(q) => Some(q.bytes.len()),
1734 Value::HalfVector(h) => Some(h.dim()),
1735 _ => None,
1736 };
1737 let Some(dim) = cell_dim else {
1738 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1741 return;
1742 };
1743 if dim == 0 {
1744 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1745 return;
1746 }
1747 let level = nsw_assign_level(new_row_idx);
1748 ensure_node_slot(table, idx_pos, new_row_idx, level);
1749 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1750 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1751 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1752 unreachable!("nsw_insert_at on a non-NSW index")
1753 }
1754 };
1755 if entry.is_none() {
1757 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1758 g.entry = Some(new_row_idx);
1759 g.entry_level = level;
1760 *g.levels
1761 .get_mut(new_row_idx)
1762 .expect("levels slot padded by ensure_node_slot") = level;
1763 }
1764 return;
1765 }
1766 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1768 *g.levels
1769 .get_mut(new_row_idx)
1770 .expect("levels slot padded by ensure_node_slot") = level;
1771 }
1772 let query = match &table.rows[new_row_idx].values[col_pos] {
1773 Value::Vector(v) => v.clone(),
1774 Value::Sq8Vector(q) => quantize::dequantize(q),
1780 Value::HalfVector(h) => h.to_f32_vec(),
1783 _ => return,
1784 };
1785 let mut current = entry.expect("entry was Some above");
1788 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1789 if entry_level > level {
1790 for layer in (level + 1..=entry_level).rev() {
1791 (current, current_d) =
1792 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1793 }
1794 }
1795 let top = level.min(entry_level);
1799 let ef = (m * 2).max(8);
1800 for layer in (0..=top).rev() {
1801 let cap = if layer == 0 { m * 2 } else { m };
1802 let mut candidates = layer_beam_search(
1803 table,
1804 idx_pos,
1805 layer,
1806 current,
1807 current_d,
1808 &query,
1809 ef,
1810 NswMetric::L2,
1811 );
1812 candidates.retain(|&(_, n)| n != new_row_idx);
1813 if let Some(&(d, n)) = candidates.first() {
1816 current = n;
1817 current_d = d;
1818 }
1819 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1820 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1821 }
1822 if level > entry_level
1825 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1826 {
1827 g.entry = Some(new_row_idx);
1828 g.entry_level = level;
1829 }
1830}
1831
1832fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1836 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1837 unreachable!("ensure_node_slot on a BTree index");
1838 };
1839 while g.layers.len() <= level as usize {
1840 g.layers.push(PersistentVec::new());
1841 }
1842 while g.levels.len() <= new_row_idx {
1843 g.levels.push_mut(0);
1844 }
1845 for layer_vec in &mut g.layers {
1846 while layer_vec.len() <= new_row_idx {
1847 layer_vec.push_mut(Vec::new());
1848 }
1849 }
1850}
1851
1852fn greedy_layer_walk(
1858 table: &Table,
1859 idx_pos: usize,
1860 layer: u8,
1861 mut current: usize,
1862 mut current_d: f32,
1863 query: &[f32],
1864) -> (usize, f32) {
1865 let g = match &table.indices[idx_pos].kind {
1866 IndexKind::Nsw(g) => g,
1867 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1868 };
1869 let col_pos = table.indices[idx_pos].column_position;
1870 loop {
1871 let neighbours: &[u32] = g
1872 .layers
1873 .get(layer as usize)
1874 .and_then(|layer_v| layer_v.get(current))
1875 .map_or(&[][..], Vec::as_slice);
1876 let mut best = current;
1877 let mut best_d = current_d;
1878 for &n in neighbours {
1879 let n = n as usize;
1880 let d = vec_l2_sq(table, col_pos, n, query);
1881 if d < best_d {
1882 best = n;
1883 best_d = d;
1884 }
1885 }
1886 if best == current {
1887 return (current, current_d);
1888 }
1889 current = best;
1890 current_d = best_d;
1891 }
1892}
1893
1894#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1907 table: &Table,
1908 idx_pos: usize,
1909 layer: u8,
1910 entry_node: usize,
1911 entry_d: f32,
1912 query: &[f32],
1913 ef: usize,
1914 metric: NswMetric,
1915) -> Vec<(f32, usize)> {
1916 let g = match &table.indices[idx_pos].kind {
1917 IndexKind::Nsw(g) => g,
1918 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1919 };
1920 let col_pos = table.indices[idx_pos].column_position;
1921 let d0 = if matches!(metric, NswMetric::L2) {
1922 entry_d
1923 } else {
1924 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1925 };
1926 let row_count = table.rows.len();
1927 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1928 if entry_node < row_count {
1929 visited[entry_node] = true;
1930 }
1931 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1934 alloc::collections::BinaryHeap::with_capacity(ef);
1935 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1936 alloc::collections::BinaryHeap::with_capacity(ef);
1937 candidates.push(NodeClosest {
1938 dist: d0,
1939 node: entry_node,
1940 });
1941 results.push(NodeFurthest {
1942 dist: d0,
1943 node: entry_node,
1944 });
1945 while let Some(cur) = candidates.pop() {
1946 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1947 if cur.dist > worst && results.len() >= ef {
1948 break;
1949 }
1950 let neighbours: &[u32] = g
1951 .layers
1952 .get(layer as usize)
1953 .and_then(|layer_v| layer_v.get(cur.node))
1954 .map_or(&[][..], Vec::as_slice);
1955 for &n in neighbours {
1956 let n = n as usize;
1957 if n >= row_count || visited[n] {
1958 continue;
1959 }
1960 visited[n] = true;
1961 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
1965 if !dn.is_finite() {
1966 continue;
1967 }
1968 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1969 if results.len() < ef || dn < worst {
1970 results.push(NodeFurthest { dist: dn, node: n });
1971 if results.len() > ef {
1972 results.pop();
1973 }
1974 candidates.push(NodeClosest { dist: dn, node: n });
1975 }
1976 }
1977 }
1978 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
1981 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
1982 out
1983}
1984
1985#[derive(Debug, Clone, Copy)]
1989struct NodeClosest {
1990 dist: f32,
1991 node: usize,
1992}
1993impl PartialEq for NodeClosest {
1994 fn eq(&self, other: &Self) -> bool {
1995 self.dist == other.dist && self.node == other.node
1996 }
1997}
1998impl Eq for NodeClosest {}
1999impl PartialOrd for NodeClosest {
2000 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2001 Some(self.cmp(other))
2002 }
2003}
2004impl Ord for NodeClosest {
2005 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2006 other
2008 .dist
2009 .partial_cmp(&self.dist)
2010 .unwrap_or(core::cmp::Ordering::Equal)
2011 }
2012}
2013
2014#[derive(Debug, Clone, Copy)]
2017struct NodeFurthest {
2018 dist: f32,
2019 node: usize,
2020}
2021impl PartialEq for NodeFurthest {
2022 fn eq(&self, other: &Self) -> bool {
2023 self.dist == other.dist && self.node == other.node
2024 }
2025}
2026impl Eq for NodeFurthest {}
2027impl PartialOrd for NodeFurthest {
2028 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2029 Some(self.cmp(other))
2030 }
2031}
2032impl Ord for NodeFurthest {
2033 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2034 self.dist
2035 .partial_cmp(&other.dist)
2036 .unwrap_or(core::cmp::Ordering::Equal)
2037 }
2038}
2039
2040fn select_neighbours_heuristic(
2049 candidates: &[(f32, usize)],
2050 m: usize,
2051 table: &Table,
2052 col_pos: usize,
2053) -> Vec<usize> {
2054 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2055 for &(d_q, e) in candidates {
2056 if chosen.len() >= m {
2057 break;
2058 }
2059 if !matches!(
2064 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2065 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2066 ) {
2067 continue;
2068 }
2069 let mut covered = false;
2070 for &r in &chosen {
2071 if cell_l2_sq(table, col_pos, e, r) < d_q {
2075 covered = true;
2076 break;
2077 }
2078 }
2079 if !covered {
2080 chosen.push(e);
2081 }
2082 }
2083 chosen
2084}
2085
2086fn connect_at_layer(
2090 table: &mut Table,
2091 idx_pos: usize,
2092 layer: u8,
2093 new_row_idx: usize,
2094 peers: &[usize],
2095) {
2096 let col_pos = table.indices[idx_pos].column_position;
2097 let cap = match &table.indices[idx_pos].kind {
2098 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2099 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2100 };
2101 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2106 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2107 let layer_v = &mut g.layers[layer as usize];
2108 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2109 *slot = peers
2110 .iter()
2111 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2112 .collect();
2113 }
2114 }
2115 for &peer in peers {
2116 if !matches!(
2120 &table.rows[peer].values[col_pos],
2121 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2122 ) {
2123 continue;
2124 }
2125 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2127 let layer_v = &mut g.layers[layer as usize];
2128 if let Some(slot) = layer_v.get_mut(peer)
2129 && !slot.contains(&new_row_u32)
2130 {
2131 slot.push(new_row_u32);
2132 }
2133 }
2134 let needs_trim = match &table.indices[idx_pos].kind {
2138 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2139 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2140 };
2141 if needs_trim {
2142 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2143 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2144 .iter()
2145 .map(|&n| n as usize)
2146 .collect(),
2147 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2148 };
2149 let mut tagged: Vec<(f32, usize)> = current_peers
2154 .iter()
2155 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2156 .collect();
2157 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2158 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2159 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2160 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2161 {
2162 *slot = kept
2163 .into_iter()
2164 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2165 .collect();
2166 }
2167 }
2168 }
2169}
2170
2171fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2178 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2179 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2180 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2181 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2182 }
2183 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2187 halfvec::half_l2_distance_sq_asymmetric(h, query)
2188 }
2189 _ => f32::INFINITY,
2190 }
2191}
2192
2193fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2200 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2201 return f32::INFINITY;
2202 };
2203 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2204 return f32::INFINITY;
2205 };
2206 match (cell_a, cell_b) {
2207 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2208 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2209 quantize::sq8_l2_distance_sq(a, b)
2210 }
2211 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2216 halfvec::half_l2_distance_sq(a, b)
2217 }
2218 _ => f32::INFINITY,
2219 }
2220}
2221
2222fn cell_to_query_metric_distance(
2227 table: &Table,
2228 col_pos: usize,
2229 row: usize,
2230 query: &[f32],
2231 metric: NswMetric,
2232) -> f32 {
2233 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2234 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2235 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2236 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2237 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2238 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2239 },
2240 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2243 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2244 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2245 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2246 },
2247 _ => f32::INFINITY,
2248 }
2249}
2250
2251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2257pub enum NswMetric {
2258 L2,
2261 InnerProduct,
2264 Cosine,
2267}
2268
2269fn nsw_search(
2275 table: &Table,
2276 idx_pos: usize,
2277 query: &[f32],
2278 k: usize,
2279 ef: usize,
2280 metric: NswMetric,
2281) -> Vec<(f32, usize)> {
2282 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2283 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2284 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2285 };
2286 let Some(entry) = entry else {
2287 return Vec::new();
2288 };
2289 let col_pos = table.indices[idx_pos].column_position;
2290 let sq8 = matches!(
2297 table.schema.columns.get(col_pos).map(|c| c.ty),
2298 Some(DataType::Vector {
2299 encoding: VecEncoding::Sq8,
2300 ..
2301 })
2302 );
2303 let ef = if sq8 {
2304 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2305 } else {
2306 ef.max(k)
2307 };
2308 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2310 let mut current = entry;
2311 let mut current_d = entry_d;
2312 for layer in (1..=entry_level).rev() {
2313 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2314 }
2315 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2317 if sq8 {
2318 results = sq8_rerank(table, col_pos, &results, query, metric);
2319 }
2320 results.truncate(k);
2321 results
2322}
2323
2324fn sq8_rerank(
2331 table: &Table,
2332 col_pos: usize,
2333 candidates: &[(f32, usize)],
2334 query: &[f32],
2335 metric: NswMetric,
2336) -> Vec<(f32, usize)> {
2337 let mut out: Vec<(f32, usize)> = candidates
2338 .iter()
2339 .filter_map(|&(adc_d, row)| {
2340 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2341 let Value::Sq8Vector(q) = cell else {
2342 return Some((adc_d, row));
2346 };
2347 let deq = quantize::dequantize(q);
2348 if deq.len() != query.len() {
2349 return None;
2350 }
2351 Some((metric_distance(metric, &deq, query), row))
2352 })
2353 .collect();
2354 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2355 out
2356}
2357
2358const SQ8_RERANK_OVER_FETCH: usize = 3;
2362
2363fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2364 match metric {
2365 NswMetric::L2 => l2_distance_sq(a, b),
2366 NswMetric::InnerProduct => -inner_product_f32(a, b),
2367 NswMetric::Cosine => {
2368 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2369 if na == 0.0 || nb == 0.0 {
2370 return f32::INFINITY;
2371 }
2372 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2375 1.0 - dot / denom
2376 }
2377 }
2378}
2379
2380#[doc(hidden)]
2389#[inline]
2390pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2391 #[cfg(target_arch = "aarch64")]
2392 {
2393 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2394 return unsafe { inner_product_neon(a, b) };
2397 }
2398 }
2399 inner_product_scalar(a, b)
2400}
2401
2402fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2403 let mut dot: f32 = 0.0;
2404 for (x, y) in a.iter().zip(b.iter()) {
2405 dot += x * y;
2406 }
2407 dot
2408}
2409
2410#[cfg(target_arch = "aarch64")]
2411#[target_feature(enable = "neon")]
2412#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2414 use core::arch::aarch64::{
2415 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2416 };
2417 unsafe {
2418 let zero: float32x4_t = vdupq_n_f32(0.0);
2421 let mut acc0 = zero;
2422 let mut acc1 = zero;
2423 let n = a.len();
2424 let mut i = 0usize;
2425 while i + 8 <= n {
2426 let av0 = vld1q_f32(a.as_ptr().add(i));
2427 let bv0 = vld1q_f32(b.as_ptr().add(i));
2428 acc0 = vfmaq_f32(acc0, av0, bv0);
2429 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2430 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2431 acc1 = vfmaq_f32(acc1, av1, bv1);
2432 i += 8;
2433 }
2434 while i + 4 <= n {
2435 let av = vld1q_f32(a.as_ptr().add(i));
2436 let bv = vld1q_f32(b.as_ptr().add(i));
2437 acc0 = vfmaq_f32(acc0, av, bv);
2438 i += 4;
2439 }
2440 vaddvq_f32(vaddq_f32(acc0, acc1))
2441 }
2442}
2443
2444#[doc(hidden)]
2451#[inline]
2452pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2453 #[cfg(target_arch = "aarch64")]
2454 {
2455 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2456 return unsafe { cosine_dot_norms_neon(a, b) };
2458 }
2459 }
2460 cosine_dot_norms_scalar(a, b)
2461}
2462
2463fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2464 let mut dot: f32 = 0.0;
2465 let mut na: f32 = 0.0;
2466 let mut nb: f32 = 0.0;
2467 for (x, y) in a.iter().zip(b.iter()) {
2468 dot += x * y;
2469 na += x * x;
2470 nb += y * y;
2471 }
2472 (dot, na, nb)
2473}
2474
2475#[cfg(target_arch = "aarch64")]
2476#[target_feature(enable = "neon")]
2477#[allow(clippy::many_single_char_names, clippy::similar_names)]
2478unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2479 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2480 unsafe {
2481 let zero: float32x4_t = vdupq_n_f32(0.0);
2482 let mut acc_dot = zero;
2483 let mut acc_na = zero;
2484 let mut acc_nb = zero;
2485 let n = a.len();
2486 let mut i = 0usize;
2487 while i + 4 <= n {
2488 let av = vld1q_f32(a.as_ptr().add(i));
2489 let bv = vld1q_f32(b.as_ptr().add(i));
2490 acc_dot = vfmaq_f32(acc_dot, av, bv);
2491 acc_na = vfmaq_f32(acc_na, av, av);
2492 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2493 i += 4;
2494 }
2495 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2496 }
2497}
2498
2499fn sqrt_newton_f32(x: f32) -> f32 {
2500 if x <= 0.0 {
2501 return 0.0;
2502 }
2503 let mut g = x;
2504 for _ in 0..10 {
2505 g = 0.5 * (g + x / g);
2506 }
2507 g
2508}
2509
2510#[inline]
2518fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2519 #[cfg(target_arch = "aarch64")]
2520 {
2521 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2522 return unsafe { l2_distance_sq_neon(a, b) };
2526 }
2527 }
2528 l2_distance_sq_scalar(a, b)
2529}
2530
2531fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2532 let mut sum: f32 = 0.0;
2533 for (x, y) in a.iter().zip(b.iter()) {
2534 let d = *x - *y;
2535 sum += d * d;
2536 }
2537 sum
2538}
2539
2540#[cfg(target_arch = "aarch64")]
2541#[target_feature(enable = "neon")]
2542#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2544 use core::arch::aarch64::{
2545 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2546 };
2547 unsafe {
2548 let zero: float32x4_t = vdupq_n_f32(0.0);
2553 let mut acc0 = zero;
2554 let mut acc1 = zero;
2555 let n = a.len();
2556 let mut i = 0usize;
2557 while i + 8 <= n {
2560 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2561 acc0 = vfmaq_f32(acc0, d0, d0);
2562 let d1 = vsubq_f32(
2563 vld1q_f32(a.as_ptr().add(i + 4)),
2564 vld1q_f32(b.as_ptr().add(i + 4)),
2565 );
2566 acc1 = vfmaq_f32(acc1, d1, d1);
2567 i += 8;
2568 }
2569 while i + 4 <= n {
2570 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2571 acc0 = vfmaq_f32(acc0, d, d);
2572 i += 4;
2573 }
2574 vaddvq_f32(vaddq_f32(acc0, acc1))
2575 }
2576}
2577
2578pub fn nsw_query(
2581 table: &Table,
2582 idx_name: &str,
2583 query: &[f32],
2584 k: usize,
2585 metric: NswMetric,
2586) -> Vec<usize> {
2587 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2588 return Vec::new();
2589 };
2590 let ef = (k * 2).max(NSW_DEFAULT_M);
2591 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2592 hits.truncate(k);
2593 hits.into_iter().map(|(_, idx)| idx).collect()
2594}
2595
2596pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2600 table
2601 .indices
2602 .iter()
2603 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2604}
2605
2606#[derive(Debug, Clone, Default)]
2618pub struct Catalog {
2619 tables: Vec<Table>,
2620 by_name: BTreeMap<String, usize>,
2623 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2645}
2646
2647impl Catalog {
2648 pub const fn new() -> Self {
2649 Self {
2650 tables: Vec::new(),
2651 by_name: BTreeMap::new(),
2652 cold_segments: Vec::new(),
2653 }
2654 }
2655
2656 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2657 if self.by_name.contains_key(&schema.name) {
2658 return Err(StorageError::DuplicateTable {
2659 name: schema.name.clone(),
2660 });
2661 }
2662 let idx = self.tables.len();
2663 let name = schema.name.clone();
2664 self.tables.push(Table::new(schema));
2665 self.by_name.insert(name, idx);
2666 Ok(())
2667 }
2668
2669 pub fn get(&self, name: &str) -> Option<&Table> {
2670 let idx = *self.by_name.get(name)?;
2671 self.tables.get(idx)
2672 }
2673
2674 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2675 let idx = *self.by_name.get(name)?;
2676 self.tables.get_mut(idx)
2677 }
2678
2679 pub fn table_count(&self) -> usize {
2680 self.tables.len()
2681 }
2682
2683 pub fn table_names(&self) -> Vec<String> {
2686 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2687 }
2688
2689 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2700 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2701 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2702 })?;
2703 let seg = OwnedSegment::from_bytes(bytes)
2704 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2705 self.cold_segments.push(Some(Arc::new(seg)));
2706 Ok(id)
2707 }
2708
2709 pub fn load_segment_bytes_at(
2722 &mut self,
2723 target_id: u32,
2724 bytes: Vec<u8>,
2725 ) -> Result<(), StorageError> {
2726 let seg = OwnedSegment::from_bytes(bytes)
2727 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2728 let idx = target_id as usize;
2729 while self.cold_segments.len() <= idx {
2730 self.cold_segments.push(None);
2731 }
2732 if self.cold_segments[idx].is_some() {
2733 return Err(StorageError::Corrupt(format!(
2734 "load_segment_bytes_at: segment_id {target_id} already occupied"
2735 )));
2736 }
2737 self.cold_segments[idx] = Some(Arc::new(seg));
2738 Ok(())
2739 }
2740
2741 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2751 let idx = segment_id as usize;
2752 if idx >= self.cold_segments.len() {
2753 return Err(StorageError::Corrupt(format!(
2754 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2755 self.cold_segments.len()
2756 )));
2757 }
2758 self.cold_segments[idx] = None;
2759 Ok(())
2760 }
2761
2762 #[must_use]
2764 pub fn cold_segment_count(&self) -> usize {
2765 self.cold_segments.iter().filter(|s| s.is_some()).count()
2766 }
2767
2768 #[must_use]
2771 pub fn cold_segment_slot_count(&self) -> usize {
2772 self.cold_segments.len()
2773 }
2774
2775 #[must_use]
2780 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2781 self.cold_segments
2782 .iter()
2783 .enumerate()
2784 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2785 .collect()
2786 }
2787
2788 #[must_use]
2795 pub fn hot_tier_bytes(&self) -> u64 {
2796 self.tables
2797 .iter()
2798 .map(Table::hot_bytes)
2799 .fold(0u64, u64::saturating_add)
2800 }
2801
2802 pub fn freeze_oldest_to_cold(
2847 &mut self,
2848 table_name: &str,
2849 index_name: &str,
2850 max_rows: usize,
2851 ) -> Result<FreezeReport, StorageError> {
2852 if max_rows == 0 {
2854 return Err(StorageError::Corrupt(
2855 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2856 ));
2857 }
2858 let table = self.get(table_name).ok_or_else(|| {
2859 StorageError::Corrupt(format!(
2860 "freeze_oldest_to_cold: table {table_name:?} not found"
2861 ))
2862 })?;
2863 if max_rows > table.rows.len() {
2864 return Err(StorageError::Corrupt(format!(
2865 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2866 table.rows.len()
2867 )));
2868 }
2869 let idx = table
2870 .indices
2871 .iter()
2872 .find(|i| i.name == index_name)
2873 .ok_or_else(|| {
2874 StorageError::Corrupt(format!(
2875 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2876 ))
2877 })?;
2878 if !matches!(idx.kind, IndexKind::BTree(_)) {
2879 return Err(StorageError::Corrupt(format!(
2880 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2881 )));
2882 }
2883 let column_position = idx.column_position;
2884
2885 let schema = table.schema.clone();
2887 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2888 for row_idx in 0..max_rows {
2889 let row = table.rows.get(row_idx).expect("bounds-checked above");
2890 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2891 StorageError::Corrupt(format!(
2892 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2893 ))
2894 })?;
2895 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2896 StorageError::Corrupt(format!(
2897 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2898 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2899 ))
2900 })?;
2901 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2902 }
2903 to_freeze.sort_by_key(|(k, _, _)| *k);
2908 for w in to_freeze.windows(2) {
2912 if w[0].0 == w[1].0 {
2913 return Err(StorageError::Corrupt(format!(
2914 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2915 w[0].0
2916 )));
2917 }
2918 }
2919 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2923 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2927 .into_iter()
2928 .map(|(k, body, _)| (k, body))
2929 .collect();
2930 let frozen_rows = seg_rows.len();
2931 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2932 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2933
2934 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2943 let positions: Vec<usize> = (0..max_rows).collect();
2944 let t_mut = self
2945 .get_mut(table_name)
2946 .expect("just validated; still present");
2947 let removed = t_mut.delete_rows(&positions);
2948 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2949 let bytes_after = t_mut.hot_bytes();
2950 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2951
2952 let segment_id = self
2953 .load_segment_bytes(seg_bytes.clone())
2954 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2955 let new_cold = post_swap_keys.into_iter().map(|k| {
2956 (
2957 k,
2958 RowLocator::Cold {
2959 segment_id,
2960 page_offset: 0,
2961 },
2962 )
2963 });
2964 let t_mut = self.get_mut(table_name).expect("still present");
2965 t_mut.register_cold_locators(index_name, new_cold)?;
2966
2967 Ok(FreezeReport {
2968 segment_id,
2969 frozen_rows,
2970 bytes_freed,
2971 segment_bytes: seg_bytes,
2972 })
2973 }
2974
2975 #[must_use]
2981 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
2982 self.cold_segments
2983 .get(segment_id as usize)
2984 .and_then(|s| s.as_deref())
2985 }
2986
2987 pub fn resolve_cold_locator(
2996 &self,
2997 table_name: &str,
2998 segment_id: u32,
2999 key: &IndexKey,
3000 ) -> Option<Row> {
3001 let t = self.get(table_name)?;
3002 let u64_key = index_key_as_u64(key)?;
3003 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3004 let payload = seg.lookup(u64_key)?;
3005 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3006 Some(row)
3007 }
3008
3009 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3027 let t = self.get(table)?;
3028 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3029 let locators = idx.lookup_eq(key);
3030 let cold_u64_key = index_key_as_u64(key);
3031 for loc in locators {
3032 match *loc {
3033 RowLocator::Hot(i) => {
3034 if let Some(row) = t.rows.get(i) {
3035 return Some(row.clone());
3036 }
3037 }
3038 RowLocator::Cold {
3039 segment_id,
3040 page_offset: _,
3041 } => {
3042 let Some(u64_key) = cold_u64_key else {
3043 continue;
3046 };
3047 let Some(seg) = self
3048 .cold_segments
3049 .get(segment_id as usize)
3050 .and_then(|s| s.as_deref())
3051 else {
3052 continue;
3063 };
3064 let Some(payload) = seg.lookup(u64_key) else {
3065 continue;
3066 };
3067 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3068 return Some(row);
3069 }
3070 }
3071 }
3072 None
3073 }
3074
3075 pub fn promote_cold_row(
3097 &mut self,
3098 table_name: &str,
3099 index_name: &str,
3100 key: &IndexKey,
3101 ) -> Result<Option<usize>, StorageError> {
3102 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3103 let Some((segment_id, _page_offset)) = cold_loc else {
3104 return Ok(None);
3105 };
3106 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3107 StorageError::Corrupt(
3108 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3109 .into(),
3110 )
3111 })?;
3112 let schema = self
3116 .get(table_name)
3117 .ok_or_else(|| {
3118 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3119 })?
3120 .schema
3121 .clone();
3122 let seg = self
3123 .cold_segments
3124 .get(segment_id as usize)
3125 .and_then(|s| s.as_ref())
3126 .ok_or_else(|| {
3127 StorageError::Corrupt(format!(
3128 "promote_cold_row: segment {segment_id} not registered on catalog"
3129 ))
3130 })?;
3131 let payload = seg.lookup(u64_key).ok_or_else(|| {
3132 StorageError::Corrupt(format!(
3133 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3134 but the segment's bloom/page lookup didn't return a row"
3135 ))
3136 })?;
3137 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3138 let t = self
3143 .get_mut(table_name)
3144 .expect("table existed at lookup time");
3145 t.insert(row)?;
3146 let new_hot_idx =
3147 t.rows.len().checked_sub(1).ok_or_else(|| {
3148 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3149 })?;
3150 t.remove_cold_locators_for_key(index_name, key)?;
3154 Ok(Some(new_hot_idx))
3155 }
3156
3157 pub fn shadow_cold_row(
3175 &mut self,
3176 table_name: &str,
3177 index_name: &str,
3178 key: &IndexKey,
3179 ) -> Result<usize, StorageError> {
3180 let t = self.get_mut(table_name).ok_or_else(|| {
3181 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3182 })?;
3183 t.remove_cold_locators_for_key(index_name, key)
3184 }
3185
3186 pub fn prepare_freeze_slice(
3204 &self,
3205 table_name: &str,
3206 index_name: &str,
3207 row_range: core::ops::Range<usize>,
3208 ) -> Result<FreezeSlice, StorageError> {
3209 let table = self.get(table_name).ok_or_else(|| {
3210 StorageError::Corrupt(format!(
3211 "prepare_freeze_slice: table {table_name:?} not found"
3212 ))
3213 })?;
3214 let idx = table
3215 .indices
3216 .iter()
3217 .find(|i| i.name == index_name)
3218 .ok_or_else(|| {
3219 StorageError::Corrupt(format!(
3220 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3221 ))
3222 })?;
3223 if !matches!(idx.kind, IndexKind::BTree(_)) {
3224 return Err(StorageError::Corrupt(format!(
3225 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3226 )));
3227 }
3228 if row_range.end > table.rows.len() {
3229 return Err(StorageError::Corrupt(format!(
3230 "prepare_freeze_slice: row_range end {} > row_count {}",
3231 row_range.end,
3232 table.rows.len()
3233 )));
3234 }
3235 let column_position = idx.column_position;
3236 let schema = table.schema.clone();
3237 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3238 for row_idx in row_range.clone() {
3239 let row = table.rows.get(row_idx).expect("bounds-checked above");
3240 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3241 StorageError::Corrupt(format!(
3242 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3243 ))
3244 })?;
3245 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3246 StorageError::Corrupt(format!(
3247 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3248 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3249 ))
3250 })?;
3251 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3252 }
3253 rows.sort_by_key(|(k, _, _)| *k);
3254 Ok(FreezeSlice { row_range, rows })
3255 }
3256
3257 pub fn commit_freeze_slices(
3271 &mut self,
3272 table_name: &str,
3273 index_name: &str,
3274 slices: Vec<FreezeSlice>,
3275 ) -> Result<FreezeReport, StorageError> {
3276 let table = self.get(table_name).ok_or_else(|| {
3278 StorageError::Corrupt(format!(
3279 "commit_freeze_slices: table {table_name:?} not found"
3280 ))
3281 })?;
3282 let idx = table
3283 .indices
3284 .iter()
3285 .find(|i| i.name == index_name)
3286 .ok_or_else(|| {
3287 StorageError::Corrupt(format!(
3288 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3289 ))
3290 })?;
3291 if !matches!(idx.kind, IndexKind::BTree(_)) {
3292 return Err(StorageError::Corrupt(format!(
3293 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3294 )));
3295 }
3296 let mut ordered = slices;
3300 ordered.sort_by_key(|s| s.row_range.start);
3301 let mut expected_start = 0usize;
3305 for s in &ordered {
3306 if s.row_range.start != expected_start {
3307 return Err(StorageError::Corrupt(format!(
3308 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3309 s.row_range.start, expected_start
3310 )));
3311 }
3312 expected_start = s.row_range.end;
3313 }
3314 let max_rows = expected_start;
3315 if max_rows > table.rows.len() {
3316 return Err(StorageError::Corrupt(format!(
3317 "commit_freeze_slices: total row range {} exceeds row_count {}",
3318 max_rows,
3319 table.rows.len()
3320 )));
3321 }
3322 if max_rows == 0 {
3323 return Ok(FreezeReport {
3324 segment_id: u32::MAX,
3325 frozen_rows: 0,
3326 bytes_freed: 0,
3327 segment_bytes: Vec::new(),
3328 });
3329 }
3330
3331 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3336 if total_rows != max_rows {
3337 return Err(StorageError::Corrupt(format!(
3338 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3339 )));
3340 }
3341 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3342 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3343 loop {
3344 let mut pick: Option<usize> = None;
3347 for (i, c) in cursors.iter().enumerate() {
3348 let slice = &ordered[i];
3349 if *c >= slice.rows.len() {
3350 continue;
3351 }
3352 match pick {
3353 None => pick = Some(i),
3354 Some(j) => {
3355 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3356 pick = Some(i);
3357 }
3358 }
3359 }
3360 }
3361 let Some(i) = pick else { break };
3362 let row = ordered[i].rows[cursors[i]].clone();
3363 cursors[i] += 1;
3364 merged.push(row);
3365 }
3366 for w in merged.windows(2) {
3369 if w[0].0 == w[1].0 {
3370 return Err(StorageError::Corrupt(format!(
3371 "commit_freeze_slices: duplicate PK {} across slices",
3372 w[0].0
3373 )));
3374 }
3375 }
3376 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3377 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3378 .into_iter()
3379 .map(|(k, body, _)| (k, body))
3380 .collect();
3381 let frozen_rows = seg_rows.len();
3382 let (seg_bytes, _meta) =
3383 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3384 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3385 })?;
3386
3387 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3389 let positions: Vec<usize> = (0..max_rows).collect();
3390 let t_mut = self
3391 .get_mut(table_name)
3392 .expect("just validated; still present");
3393 let removed = t_mut.delete_rows(&positions);
3394 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3395 let bytes_after = t_mut.hot_bytes();
3396 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3397
3398 let segment_id = self
3399 .load_segment_bytes(seg_bytes.clone())
3400 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3401 let new_cold = post_swap_keys.into_iter().map(|k| {
3402 (
3403 k,
3404 RowLocator::Cold {
3405 segment_id,
3406 page_offset: 0,
3407 },
3408 )
3409 });
3410 let t_mut = self.get_mut(table_name).expect("still present");
3411 t_mut.register_cold_locators(index_name, new_cold)?;
3412
3413 Ok(FreezeReport {
3414 segment_id,
3415 frozen_rows,
3416 bytes_freed,
3417 segment_bytes: seg_bytes,
3418 })
3419 }
3420
3421 pub fn compact_cold_segments(
3464 &mut self,
3465 table_name: &str,
3466 index_name: &str,
3467 target_segment_bytes: u64,
3468 ) -> Result<CompactReport, StorageError> {
3469 let t = self.get(table_name).ok_or_else(|| {
3471 StorageError::Corrupt(format!(
3472 "compact_cold_segments: table {table_name:?} not found"
3473 ))
3474 })?;
3475 let idx = t
3476 .indices
3477 .iter()
3478 .find(|i| i.name == index_name)
3479 .ok_or_else(|| {
3480 StorageError::Corrupt(format!(
3481 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3482 ))
3483 })?;
3484 let map = match &idx.kind {
3485 IndexKind::BTree(m) => m,
3486 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3487 return Err(StorageError::Corrupt(format!(
3488 "compact_cold_segments: index {index_name:?} is not BTree; \
3489 compaction applies only to BTree cold-tier indices"
3490 )));
3491 }
3492 };
3493
3494 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3497 for (_key, locators) in map.iter() {
3498 for loc in locators {
3499 if let RowLocator::Cold { segment_id, .. } = loc {
3500 referenced_ids.insert(*segment_id);
3501 }
3502 }
3503 }
3504 let candidate_set: BTreeSet<u32> = referenced_ids
3506 .into_iter()
3507 .filter(|id| {
3508 self.cold_segments
3509 .get(*id as usize)
3510 .and_then(|s| s.as_deref())
3511 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3512 })
3513 .collect();
3514 if candidate_set.len() < 2 {
3515 return Ok(CompactReport {
3516 sources: Vec::new(),
3517 merged_segment_id: None,
3518 merged_segment_bytes: Vec::new(),
3519 merged_rows: 0,
3520 deleted_rows_pruned: 0,
3521 bytes_reclaimed_estimate: 0,
3522 });
3523 }
3524 let mut source_row_count: usize = 0;
3526 let mut source_byte_total: u64 = 0;
3527 for &id in &candidate_set {
3528 let seg = self.cold_segments[id as usize]
3529 .as_ref()
3530 .expect("candidate selected only when slot is Some");
3531 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3532 source_byte_total =
3533 source_byte_total.saturating_add(seg.bytes().len() as u64);
3534 }
3535 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3541 for (key, locators) in map.iter() {
3542 for loc in locators {
3543 let RowLocator::Cold { segment_id, .. } = loc else {
3544 continue;
3545 };
3546 if !candidate_set.contains(segment_id) {
3547 continue;
3548 }
3549 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3550 StorageError::Corrupt(format!(
3551 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3552 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3553 ))
3554 })?;
3555 let seg = self.cold_segments[*segment_id as usize]
3556 .as_ref()
3557 .expect("candidate slot guaranteed Some above");
3558 let payload = seg.lookup(u64_key).ok_or_else(|| {
3559 StorageError::Corrupt(format!(
3560 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3561 at segment {segment_id} but the segment lookup missed"
3562 ))
3563 })?;
3564 collected.insert(u64_key, (payload, key.clone()));
3565 break;
3566 }
3567 }
3568 let merged_rows = collected.len();
3569 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3570
3571 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3575 .iter()
3576 .map(|(k, (body, _))| (*k, body.clone()))
3577 .collect();
3578 let (seg_bytes, _meta) =
3579 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3580 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3581 })?;
3582 let merged_bytes_len = seg_bytes.len() as u64;
3583
3584 let merged_segment_id = self
3586 .load_segment_bytes(seg_bytes.clone())
3587 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3588
3589 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3595 let t = self
3596 .get(table_name)
3597 .expect("table existed at the start of this fn");
3598 let idx = t
3599 .indices
3600 .iter()
3601 .find(|i| i.name == index_name)
3602 .expect("index existed at the start of this fn");
3603 let IndexKind::BTree(map) = &idx.kind else {
3604 unreachable!("validated above");
3605 };
3606 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3607 };
3608 let t_mut = self
3609 .get_mut(table_name)
3610 .expect("table existed at the start of this fn");
3611 let idx_mut = t_mut
3612 .indices
3613 .iter_mut()
3614 .find(|i| i.name == index_name)
3615 .expect("index existed at the start of this fn");
3616 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3617 unreachable!("validated above");
3618 };
3619 for (key, locators) in entries {
3620 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3621 let mut changed = false;
3622 for loc in &locators {
3623 match *loc {
3624 RowLocator::Cold {
3625 segment_id,
3626 page_offset: _,
3627 } if candidate_set.contains(&segment_id) => {
3628 let replacement = RowLocator::Cold {
3629 segment_id: merged_segment_id,
3630 page_offset: 0,
3631 };
3632 if !new_locs.contains(&replacement) {
3633 new_locs.push(replacement);
3634 }
3635 changed = true;
3636 }
3637 other => new_locs.push(other),
3638 }
3639 }
3640 if changed {
3641 map_mut.insert_mut(key, new_locs);
3642 }
3643 }
3644
3645 for &id in &candidate_set {
3650 self.tombstone_segment(id)?;
3651 }
3652
3653 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3654 Ok(CompactReport {
3655 sources: candidate_set.into_iter().collect(),
3656 merged_segment_id: Some(merged_segment_id),
3657 merged_segment_bytes: seg_bytes,
3658 merged_rows,
3659 deleted_rows_pruned,
3660 bytes_reclaimed_estimate,
3661 })
3662 }
3663
3664 fn find_cold_locator(
3670 &self,
3671 table_name: &str,
3672 index_name: &str,
3673 key: &IndexKey,
3674 ) -> Result<Option<(u32, u32)>, StorageError> {
3675 let t = self.get(table_name).ok_or_else(|| {
3676 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3677 })?;
3678 let idx = t
3679 .indices
3680 .iter()
3681 .find(|i| i.name == index_name)
3682 .ok_or_else(|| {
3683 StorageError::Corrupt(format!(
3684 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3685 ))
3686 })?;
3687 if !matches!(idx.kind, IndexKind::BTree(_)) {
3688 return Err(StorageError::Corrupt(format!(
3689 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3690 )));
3691 }
3692 for loc in idx.lookup_eq(key) {
3693 if let RowLocator::Cold {
3694 segment_id,
3695 page_offset,
3696 } = *loc
3697 {
3698 return Ok(Some((segment_id, page_offset)));
3699 }
3700 }
3701 Ok(None)
3702 }
3703}
3704
3705fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3711 match key {
3712 IndexKey::Int(n) => Some(n.cast_unsigned()),
3718 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3719 }
3720}
3721
3722#[derive(Debug, Clone, PartialEq, Eq)]
3723#[non_exhaustive]
3724pub enum StorageError {
3725 DuplicateTable {
3726 name: String,
3727 },
3728 TableNotFound {
3729 name: String,
3730 },
3731 ArityMismatch {
3732 expected: usize,
3733 actual: usize,
3734 },
3735 TypeMismatch {
3736 column: String,
3737 expected: DataType,
3738 actual: DataType,
3739 position: usize,
3740 },
3741 NullInNotNull {
3742 column: String,
3743 },
3744 DuplicateIndex {
3746 name: String,
3747 },
3748 ColumnNotFound {
3750 column: String,
3751 },
3752 Corrupt(String),
3755 IndexNotFound {
3758 name: String,
3759 },
3760 Unsupported(String),
3764}
3765
3766impl fmt::Display for StorageError {
3767 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3768 match self {
3769 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3770 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3771 Self::ArityMismatch { expected, actual } => write!(
3772 f,
3773 "row arity mismatch: expected {expected} columns, got {actual}"
3774 ),
3775 Self::TypeMismatch {
3776 column,
3777 expected,
3778 actual,
3779 position,
3780 } => write!(
3781 f,
3782 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3783 ),
3784 Self::NullInNotNull { column } => {
3785 write!(f, "NULL value in NOT NULL column {column:?}")
3786 }
3787 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3788 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3789 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3790 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3791 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3792 }
3793 }
3794}
3795
3796impl ColumnSchema {
3797 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3798 Self {
3799 name: name.into(),
3800 ty,
3801 nullable,
3802 default: None,
3803 runtime_default: None,
3804 auto_increment: false,
3805 }
3806 }
3807
3808 #[must_use]
3812 pub fn with_default(mut self, default: Value) -> Self {
3813 self.default = Some(default);
3814 self
3815 }
3816
3817 #[must_use]
3822 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3823 self.runtime_default = Some(expr.into());
3824 self
3825 }
3826
3827 #[must_use]
3829 pub const fn with_auto_increment(mut self) -> Self {
3830 self.auto_increment = true;
3831 self
3832 }
3833}
3834
3835impl TableSchema {
3836 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3837 Self {
3838 name: name.into(),
3839 columns,
3840 hot_tier_bytes: None,
3841 foreign_keys: Vec::new(),
3842 uniqueness_constraints: Vec::new(),
3843 }
3844 }
3845}
3846
3847const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3895const FILE_VERSION: u8 = 16;
3921const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3924
3925const INDEX_KEY_TAG_INT: u8 = 0;
3930const INDEX_KEY_TAG_TEXT: u8 = 1;
3931const INDEX_KEY_TAG_BOOL: u8 = 2;
3932
3933impl Catalog {
3934 pub fn serialize(&self) -> Vec<u8> {
3937 let mut out = Vec::with_capacity(64);
3938 out.extend_from_slice(FILE_MAGIC);
3939 out.push(FILE_VERSION);
3940 write_u32(
3941 &mut out,
3942 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3943 );
3944 for t in &self.tables {
3945 write_str(&mut out, &t.schema.name);
3946 write_u16(
3947 &mut out,
3948 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3949 );
3950 for c in &t.schema.columns {
3951 write_str(&mut out, &c.name);
3952 write_data_type(&mut out, c.ty);
3953 out.push(u8::from(c.nullable));
3954 match &c.default {
3955 None => out.push(0),
3956 Some(v) => {
3957 out.push(1);
3958 write_value(&mut out, v);
3959 }
3960 }
3961 out.push(u8::from(c.auto_increment));
3962 }
3963 write_u32(
3964 &mut out,
3965 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
3966 );
3967 for row in &t.rows {
3972 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
3973 }
3974 write_u16(
3981 &mut out,
3982 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
3983 );
3984 for idx in &t.indices {
3985 write_str(&mut out, &idx.name);
3986 write_u16(
3987 &mut out,
3988 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
3989 );
3990 match &idx.kind {
3991 IndexKind::BTree(map) => {
3992 out.push(0);
3993 write_u32(
4001 &mut out,
4002 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4003 );
4004 for (key, locators) in map {
4005 write_index_key(&mut out, key);
4006 write_u32(
4007 &mut out,
4008 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4009 );
4010 for loc in locators {
4011 loc.write_le(&mut out);
4012 }
4013 }
4014 }
4015 IndexKind::Nsw(g) => {
4016 out.push(1);
4017 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4018 write_nsw_graph(&mut out, g);
4019 }
4020 IndexKind::Brin { column_type } => {
4021 out.push(2);
4027 write_data_type(&mut out, *column_type);
4028 }
4029 }
4030 write_u16(
4036 &mut out,
4037 u16::try_from(idx.included_columns.len())
4038 .expect("≤ 65k INCLUDE columns/index"),
4039 );
4040 for col_pos in &idx.included_columns {
4041 write_u16(
4042 &mut out,
4043 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4044 );
4045 }
4046 match &idx.partial_predicate {
4050 None => out.push(0),
4051 Some(pred) => {
4052 out.push(1);
4053 write_str(&mut out, pred);
4054 }
4055 }
4056 match &idx.expression {
4059 None => out.push(0),
4060 Some(expr) => {
4061 out.push(1);
4062 write_str(&mut out, expr);
4063 }
4064 }
4065 out.push(u8::from(idx.is_unique));
4069 write_u16(
4072 &mut out,
4073 u16::try_from(idx.extra_column_positions.len())
4074 .expect("≤ 65k extra cols / index"),
4075 );
4076 for cp in &idx.extra_column_positions {
4077 write_u16(
4078 &mut out,
4079 u16::try_from(*cp).expect("≤ 65k columns/table"),
4080 );
4081 }
4082 }
4083 match t.schema.hot_tier_bytes {
4089 None => out.push(0),
4090 Some(n) => {
4091 out.push(1);
4092 out.extend_from_slice(&n.to_le_bytes());
4093 }
4094 }
4095 write_u16(
4106 &mut out,
4107 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4108 );
4109 for fk in &t.schema.foreign_keys {
4110 match &fk.name {
4111 None => out.push(0),
4112 Some(n) => {
4113 out.push(1);
4114 write_str(&mut out, n);
4115 }
4116 }
4117 write_u16(
4118 &mut out,
4119 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4120 );
4121 for &p in &fk.local_columns {
4122 write_u16(
4123 &mut out,
4124 u16::try_from(p).expect("≤ 65k columns/table"),
4125 );
4126 }
4127 write_str(&mut out, &fk.parent_table);
4128 write_u16(
4129 &mut out,
4130 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4131 );
4132 for &p in &fk.parent_columns {
4133 write_u16(
4134 &mut out,
4135 u16::try_from(p).expect("≤ 65k columns/table"),
4136 );
4137 }
4138 out.push(fk.on_delete.tag());
4139 out.push(fk.on_update.tag());
4140 }
4141 write_u16(
4150 &mut out,
4151 u16::try_from(t.schema.uniqueness_constraints.len())
4152 .expect("≤ 65k uniqueness constraints/table"),
4153 );
4154 for uc in &t.schema.uniqueness_constraints {
4155 out.push(u8::from(uc.is_primary_key));
4156 write_u16(
4157 &mut out,
4158 u16::try_from(uc.columns.len())
4159 .expect("≤ 65k cols in uniqueness constraint"),
4160 );
4161 for &p in &uc.columns {
4162 write_u16(
4163 &mut out,
4164 u16::try_from(p).expect("≤ 65k columns/table"),
4165 );
4166 }
4167 }
4168 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4175 for (i, c) in t.schema.columns.iter().enumerate() {
4176 if let Some(e) = &c.runtime_default {
4177 rt_defaults.push((i, e.as_str()));
4178 }
4179 }
4180 write_u16(
4181 &mut out,
4182 u16::try_from(rt_defaults.len())
4183 .expect("≤ 65k runtime defaults/table"),
4184 );
4185 for (pos, expr) in rt_defaults {
4186 write_u16(
4187 &mut out,
4188 u16::try_from(pos).expect("≤ 65k columns/table"),
4189 );
4190 write_str(&mut out, expr);
4191 }
4192 }
4193 out
4194 }
4195
4196 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4199 let mut cur = Cursor::new(buf);
4200 let magic = cur.take(8)?;
4201 if magic != FILE_MAGIC {
4202 return Err(StorageError::Corrupt(format!(
4203 "bad magic: expected SPGDB001, got {magic:?}"
4204 )));
4205 }
4206 let version = cur.read_u8()?;
4207 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4208 return Err(StorageError::Corrupt(format!(
4209 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4210 )));
4211 }
4212 let table_count = cur.read_u32()? as usize;
4213 let mut cat = Self::new();
4214 for _ in 0..table_count {
4215 deserialize_table(&mut cur, &mut cat, version)?;
4216 }
4217 if cur.pos < buf.len() {
4218 return Err(StorageError::Corrupt(format!(
4219 "trailing bytes: {} unread",
4220 buf.len() - cur.pos
4221 )));
4222 }
4223 Ok(cat)
4224 }
4225}
4226
4227fn deserialize_table(
4232 cur: &mut Cursor<'_>,
4233 cat: &mut Catalog,
4234 version: u8,
4235) -> Result<(), StorageError> {
4236 let table_name = cur.read_str()?;
4237 let name = table_name.clone();
4238 let col_count = cur.read_u16()? as usize;
4239 let mut cols = Vec::with_capacity(col_count);
4240 for _ in 0..col_count {
4241 let c_name = cur.read_str()?;
4242 let ty = cur.read_data_type()?;
4243 let nullable = cur.read_u8()? != 0;
4244 let default = match cur.read_u8()? {
4245 0 => None,
4246 1 => Some(cur.read_value()?),
4247 other => {
4248 return Err(StorageError::Corrupt(format!(
4249 "unknown default tag: {other}"
4250 )));
4251 }
4252 };
4253 let auto_increment = cur.read_u8()? != 0;
4254 cols.push(ColumnSchema {
4258 name: c_name,
4259 ty,
4260 nullable,
4261 default,
4262 runtime_default: None,
4263 auto_increment,
4264 });
4265 }
4266 let n_cols = cols.len();
4267 cat.create_table(TableSchema::new(name, cols))?;
4268 let t = cat.tables.last_mut().expect("create_table just pushed");
4272 deserialize_rows(cur, t, n_cols)?;
4273 deserialize_indices(cur, t, version)?;
4274 if version >= 11 {
4280 let has = cur.read_u8()?;
4281 let hot_tier_bytes = match has {
4282 0 => None,
4283 1 => Some(cur.read_u64()?),
4284 other => {
4285 return Err(StorageError::Corrupt(format!(
4286 "hot_tier_bytes appendix: unknown has-value byte {other}"
4287 )));
4288 }
4289 };
4290 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4291 }
4292 if version >= 13 {
4295 let fk_count = cur.read_u16()? as usize;
4296 let mut fks = Vec::with_capacity(fk_count);
4297 for _ in 0..fk_count {
4298 let name = match cur.read_u8()? {
4299 0 => None,
4300 1 => Some(cur.read_str()?),
4301 other => {
4302 return Err(StorageError::Corrupt(format!(
4303 "FK appendix: unknown has-name byte {other}"
4304 )));
4305 }
4306 };
4307 let local_arity = cur.read_u16()? as usize;
4308 let mut local_columns = Vec::with_capacity(local_arity);
4309 for _ in 0..local_arity {
4310 local_columns.push(cur.read_u16()? as usize);
4311 }
4312 let parent_table = cur.read_str()?;
4313 let parent_arity = cur.read_u16()? as usize;
4314 if parent_arity != local_arity {
4315 return Err(StorageError::Corrupt(format!(
4316 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4317 )));
4318 }
4319 let mut parent_columns = Vec::with_capacity(parent_arity);
4320 for _ in 0..parent_arity {
4321 parent_columns.push(cur.read_u16()? as usize);
4322 }
4323 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4324 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4325 })?;
4326 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4327 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4328 })?;
4329 fks.push(ForeignKeyConstraint {
4330 name,
4331 local_columns,
4332 parent_table,
4333 parent_columns,
4334 on_delete,
4335 on_update,
4336 });
4337 }
4338 t.schema_mut().foreign_keys = fks;
4339 }
4340 if version >= 15 {
4343 let uc_count = cur.read_u16()? as usize;
4344 let mut ucs = Vec::with_capacity(uc_count);
4345 for _ in 0..uc_count {
4346 let is_pk = cur.read_u8()? != 0;
4347 let arity = cur.read_u16()? as usize;
4348 let mut cols = Vec::with_capacity(arity);
4349 for _ in 0..arity {
4350 cols.push(cur.read_u16()? as usize);
4351 }
4352 ucs.push(UniquenessConstraint {
4353 is_primary_key: is_pk,
4354 columns: cols,
4355 });
4356 }
4357 t.schema_mut().uniqueness_constraints = ucs;
4358 let rt_count = cur.read_u16()? as usize;
4360 for _ in 0..rt_count {
4361 let pos = cur.read_u16()? as usize;
4362 let expr = cur.read_str()?;
4363 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4364 col.runtime_default = Some(expr);
4365 }
4366 }
4367 }
4368 let _ = table_name;
4369 Ok(())
4370}
4371
4372fn deserialize_rows(
4373 cur: &mut Cursor<'_>,
4374 t: &mut Table,
4375 _n_cols: usize,
4376) -> Result<(), StorageError> {
4377 let row_count = cur.read_u32()? as usize;
4378 let mut hot_bytes: u64 = 0;
4383 for _ in 0..row_count {
4384 let tail = &cur.buf[cur.pos..];
4385 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4386 cur.pos += consumed;
4387 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4393 t.rows.push_mut(row);
4394 }
4395 t.hot_bytes = hot_bytes;
4396 Ok(())
4397}
4398
4399fn deserialize_indices(
4400 cur: &mut Cursor<'_>,
4401 t: &mut Table,
4402 version: u8,
4403) -> Result<(), StorageError> {
4404 let index_count = cur.read_u16()? as usize;
4405 for _ in 0..index_count {
4406 let idx_name = cur.read_str()?;
4407 let col_pos = cur.read_u16()? as usize;
4408 let column_name = t
4409 .schema
4410 .columns
4411 .get(col_pos)
4412 .ok_or_else(|| {
4413 StorageError::Corrupt(format!(
4414 "index {idx_name:?} points at non-existent column position {col_pos}"
4415 ))
4416 })?
4417 .name
4418 .clone();
4419 let kind_tag = cur.read_u8()?;
4420 match kind_tag {
4421 0 => {
4422 if version >= 9 {
4423 let map = read_btree_map(cur)?;
4428 t.restore_btree_index(idx_name, &column_name, map)?;
4429 } else {
4430 t.add_index(idx_name, &column_name)?;
4435 }
4436 }
4437 1 => {
4438 let m = cur.read_u16()? as usize;
4439 let graph = cur.read_nsw_graph(m)?;
4440 t.restore_nsw_index(idx_name, &column_name, graph)?;
4441 }
4442 2 => {
4443 let column_type = cur.read_data_type()?;
4447 t.restore_brin_index(idx_name, &column_name, column_type)?;
4448 }
4449 other => {
4450 return Err(StorageError::Corrupt(format!(
4451 "unknown index kind tag: {other}"
4452 )));
4453 }
4454 }
4455 if version >= 12 {
4458 let num_included = cur.read_u16()? as usize;
4459 if num_included > 0 {
4460 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4461 for _ in 0..num_included {
4462 let cp = cur.read_u16()? as usize;
4463 if cp >= t.schema.columns.len() {
4464 return Err(StorageError::Corrupt(format!(
4465 "INCLUDE column position {cp} out of range \
4466 ({} schema columns)",
4467 t.schema.columns.len()
4468 )));
4469 }
4470 included.push(cp);
4471 }
4472 if let Some(last) = t.indices.last_mut() {
4473 last.included_columns = included;
4474 }
4475 }
4476 match cur.read_u8()? {
4478 0 => {}
4479 1 => {
4480 let pred = cur.read_str()?;
4481 if let Some(last) = t.indices.last_mut() {
4482 last.partial_predicate = Some(pred);
4483 }
4484 }
4485 other => {
4486 return Err(StorageError::Corrupt(format!(
4487 "partial_predicate tag: unknown byte {other}"
4488 )));
4489 }
4490 }
4491 match cur.read_u8()? {
4493 0 => {}
4494 1 => {
4495 let expr = cur.read_str()?;
4496 if let Some(last) = t.indices.last_mut() {
4497 last.expression = Some(expr);
4498 }
4499 }
4500 other => {
4501 return Err(StorageError::Corrupt(format!(
4502 "expression tag: unknown byte {other}"
4503 )));
4504 }
4505 }
4506 if version >= 16 {
4509 match cur.read_u8()? {
4510 0 => {}
4511 1 => {
4512 if let Some(last) = t.indices.last_mut() {
4513 last.is_unique = true;
4514 }
4515 }
4516 other => {
4517 return Err(StorageError::Corrupt(format!(
4518 "is_unique tag: unknown byte {other}"
4519 )));
4520 }
4521 }
4522 let n = cur.read_u16()? as usize;
4524 if n > 0 {
4525 let mut extras: Vec<usize> = Vec::with_capacity(n);
4526 for _ in 0..n {
4527 let cp = cur.read_u16()? as usize;
4528 if cp >= t.schema.columns.len() {
4529 return Err(StorageError::Corrupt(format!(
4530 "extra column position {cp} out of range \
4531 ({} schema columns)",
4532 t.schema.columns.len()
4533 )));
4534 }
4535 extras.push(cp);
4536 }
4537 if let Some(last) = t.indices.last_mut() {
4538 last.extra_column_positions = extras;
4539 }
4540 }
4541 }
4542 }
4543 }
4544 Ok(())
4545}
4546
4547fn read_btree_map(
4551 cur: &mut Cursor<'_>,
4552) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4553 let entry_count = cur.read_u32()? as usize;
4554 let mut map = PersistentBTreeMap::new();
4555 for _ in 0..entry_count {
4556 let key = cur.read_index_key()?;
4557 let locator_count = cur.read_u32()? as usize;
4558 let mut locators = Vec::with_capacity(locator_count);
4559 for _ in 0..locator_count {
4560 let tail = &cur.buf[cur.pos..];
4561 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4562 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4563 })?;
4564 cur.pos += consumed;
4565 locators.push(loc);
4566 }
4567 map.insert_mut(key, locators);
4568 }
4569 Ok(map)
4570}
4571
4572fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4588 let entry = g.entry.map_or(u32::MAX, |e| {
4589 u32::try_from(e).expect("NSW entry fits in u32")
4590 });
4591 write_u16(
4592 out,
4593 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4594 );
4595 out.extend_from_slice(&entry.to_le_bytes());
4596 out.push(g.entry_level);
4597 let node_count = g.levels.len();
4598 write_u32(
4599 out,
4600 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4601 );
4602 for &lvl in &g.levels {
4603 out.push(lvl);
4604 }
4605 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4606 out.push(layer_count);
4607 for layer in &g.layers {
4608 write_u32(
4609 out,
4610 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4611 );
4612 for neighbors in layer {
4613 write_u16(
4614 out,
4615 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4616 );
4617 for &peer in neighbors {
4621 write_u32(out, peer);
4622 }
4623 }
4624 }
4625}
4626
4627fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4628 match t {
4629 DataType::Int => out.push(1),
4630 DataType::BigInt => out.push(2),
4631 DataType::Float => out.push(3),
4632 DataType::Text => out.push(4),
4633 DataType::Bool => out.push(5),
4634 DataType::Vector { dim, encoding } => match encoding {
4635 VecEncoding::F32 => {
4639 out.push(6);
4640 out.extend_from_slice(&dim.to_le_bytes());
4641 }
4642 VecEncoding::F16 => {
4645 out.push(15);
4646 out.extend_from_slice(&dim.to_le_bytes());
4647 }
4648 VecEncoding::Sq8 => {
4654 out.push(14);
4655 out.extend_from_slice(&dim.to_le_bytes());
4656 }
4657 },
4658 DataType::SmallInt => out.push(7),
4659 DataType::Varchar(max) => {
4660 out.push(8);
4661 out.extend_from_slice(&max.to_le_bytes());
4662 }
4663 DataType::Char(size) => {
4664 out.push(9);
4665 out.extend_from_slice(&size.to_le_bytes());
4666 }
4667 DataType::Numeric { precision, scale } => {
4668 out.push(10);
4669 out.push(precision);
4670 out.push(scale);
4671 }
4672 DataType::Date => out.push(11),
4673 DataType::Timestamp => out.push(12),
4674 DataType::Timestamptz => out.push(17),
4678 DataType::Interval => {
4683 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4684 }
4685 DataType::Json => out.push(13),
4686 DataType::Jsonb => out.push(16),
4689 }
4690}
4691
4692impl Cursor<'_> {
4693 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4694 let tag = self.read_u8()?;
4695 match tag {
4696 1 => Ok(DataType::Int),
4697 2 => Ok(DataType::BigInt),
4698 3 => Ok(DataType::Float),
4699 4 => Ok(DataType::Text),
4700 5 => Ok(DataType::Bool),
4701 6 => Ok(DataType::Vector {
4702 dim: self.read_u32()?,
4703 encoding: VecEncoding::F32,
4704 }),
4705 7 => Ok(DataType::SmallInt),
4706 8 => Ok(DataType::Varchar(self.read_u32()?)),
4707 9 => Ok(DataType::Char(self.read_u32()?)),
4708 10 => {
4709 let precision = self.read_u8()?;
4710 let scale = self.read_u8()?;
4711 Ok(DataType::Numeric { precision, scale })
4712 }
4713 11 => Ok(DataType::Date),
4714 12 => Ok(DataType::Timestamp),
4715 13 => Ok(DataType::Json),
4716 14 => Ok(DataType::Vector {
4717 dim: self.read_u32()?,
4718 encoding: VecEncoding::Sq8,
4719 }),
4720 15 => Ok(DataType::Vector {
4724 dim: self.read_u32()?,
4725 encoding: VecEncoding::F16,
4726 }),
4727 16 => Ok(DataType::Jsonb),
4731 17 => Ok(DataType::Timestamptz),
4735 other => Err(StorageError::Corrupt(format!(
4736 "unknown data type tag: {other}"
4737 ))),
4738 }
4739 }
4740}
4741
4742pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4748 debug_assert_eq!(
4749 row.values.len(),
4750 schema.columns.len(),
4751 "row_body_encoded_len: row arity must match schema"
4752 );
4753 let bitmap_bytes = schema.columns.len().div_ceil(8);
4754 let mut n = bitmap_bytes;
4755 for (col_idx, v) in row.values.iter().enumerate() {
4756 if matches!(v, Value::Null) {
4757 continue;
4758 }
4759 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4760 }
4761 n
4762}
4763
4764fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4770 match v {
4771 Value::SmallInt(_) => 2,
4772 Value::Int(_) | Value::Date(_) => 4,
4774 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4776 Value::Bool(_) => 1,
4777 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4779 Value::Vector(vec) => 4 + 4 * vec.len(),
4781 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4788 Value::HalfVector(h) => 4 + h.bytes.len(),
4791 Value::Numeric { .. } => 16 + 1,
4793 Value::Null => 0,
4795 Value::Interval { .. } => {
4797 unreachable!("Value::Interval has no on-disk encoding")
4798 }
4799 }
4800}
4801
4802pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4813 debug_assert_eq!(
4814 row.values.len(),
4815 schema.columns.len(),
4816 "dense encode: row arity must match schema"
4817 );
4818 let bitmap_bytes = schema.columns.len().div_ceil(8);
4819 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4822 let bitmap_offset = out.len();
4823 out.resize(bitmap_offset + bitmap_bytes, 0);
4824 for (i, v) in row.values.iter().enumerate() {
4825 if matches!(v, Value::Null) {
4826 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4827 }
4828 }
4829 for (col_idx, v) in row.values.iter().enumerate() {
4830 if matches!(v, Value::Null) {
4831 continue;
4832 }
4833 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4834 }
4835 out
4836}
4837
4838pub fn decode_row_body_dense(
4844 bytes: &[u8],
4845 schema: &TableSchema,
4846) -> Result<(Row, usize), StorageError> {
4847 let mut cur = Cursor::new(bytes);
4848 let bitmap_bytes = schema.columns.len().div_ceil(8);
4849 let mut bitmap_buf = [0u8; 32];
4850 if bitmap_bytes > bitmap_buf.len() {
4851 return Err(StorageError::Corrupt(format!(
4852 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4853 )));
4854 }
4855 let slice = cur.take(bitmap_bytes)?;
4856 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4857 let mut values = Vec::with_capacity(schema.columns.len());
4858 for (col_idx, col) in schema.columns.iter().enumerate() {
4859 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4860 values.push(Value::Null);
4861 } else {
4862 values.push(cur.read_value_body(col.ty)?);
4863 }
4864 }
4865 Ok((Row { values }, cur.pos))
4866}
4867
4868fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4877 match (v, ty) {
4878 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4879 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4880 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4881 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4882 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4883 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4884 write_str(out, s);
4885 }
4886 (
4887 Value::Vector(v),
4888 DataType::Vector {
4889 encoding: VecEncoding::F32,
4890 ..
4891 },
4892 ) => {
4893 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4894 out.extend_from_slice(&dim.to_le_bytes());
4895 for x in v {
4896 out.extend_from_slice(&x.to_le_bytes());
4897 }
4898 }
4899 (
4905 Value::Sq8Vector(q),
4906 DataType::Vector {
4907 encoding: VecEncoding::Sq8,
4908 ..
4909 },
4910 ) => {
4911 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4912 out.extend_from_slice(&dim.to_le_bytes());
4913 out.extend_from_slice(&q.min.to_le_bytes());
4914 out.extend_from_slice(&q.max.to_le_bytes());
4915 out.extend_from_slice(&q.bytes);
4916 }
4917 (
4921 Value::HalfVector(h),
4922 DataType::Vector {
4923 encoding: VecEncoding::F16,
4924 ..
4925 },
4926 ) => {
4927 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4928 out.extend_from_slice(&dim.to_le_bytes());
4929 out.extend_from_slice(&h.bytes);
4930 }
4931 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4932 out.extend_from_slice(&scaled.to_le_bytes());
4933 out.push(scale);
4934 }
4935 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4936 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
4937 out.extend_from_slice(&t.to_le_bytes())
4938 }
4939 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
4943 (other, ty) => unreachable!(
4947 "schema-driven encode received mismatched value/type pair: \
4948 value tag={:?}, column type={:?}",
4949 other.data_type(),
4950 ty
4951 ),
4952 }
4953}
4954
4955fn write_value(out: &mut Vec<u8>, v: &Value) {
4956 match v {
4957 Value::Null => out.push(0),
4958 Value::SmallInt(n) => {
4959 out.push(7);
4960 out.extend_from_slice(&n.to_le_bytes());
4961 }
4962 Value::Int(n) => {
4963 out.push(1);
4964 out.extend_from_slice(&n.to_le_bytes());
4965 }
4966 Value::BigInt(n) => {
4967 out.push(2);
4968 out.extend_from_slice(&n.to_le_bytes());
4969 }
4970 Value::Float(x) => {
4971 out.push(3);
4972 out.extend_from_slice(&x.to_le_bytes());
4973 }
4974 Value::Text(s) | Value::Json(s) => {
4979 out.push(4);
4980 write_str(out, s);
4981 }
4982 Value::Bool(b) => {
4983 out.push(5);
4984 out.push(u8::from(*b));
4985 }
4986 Value::Vector(v) => {
4987 out.push(6);
4988 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4989 out.extend_from_slice(&dim.to_le_bytes());
4990 for x in v {
4991 out.extend_from_slice(&x.to_le_bytes());
4992 }
4993 }
4994 Value::Sq8Vector(q) => {
4999 out.push(11);
5000 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5001 out.extend_from_slice(&dim.to_le_bytes());
5002 out.extend_from_slice(&q.min.to_le_bytes());
5003 out.extend_from_slice(&q.max.to_le_bytes());
5004 out.extend_from_slice(&q.bytes);
5005 }
5006 Value::HalfVector(h) => {
5011 out.push(12);
5012 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5013 out.extend_from_slice(&dim.to_le_bytes());
5014 out.extend_from_slice(&h.bytes);
5015 }
5016 Value::Numeric { scaled, scale } => {
5017 out.push(8);
5018 out.extend_from_slice(&scaled.to_le_bytes());
5019 out.push(*scale);
5020 }
5021 Value::Date(d) => {
5022 out.push(9);
5023 out.extend_from_slice(&d.to_le_bytes());
5024 }
5025 Value::Timestamp(t) => {
5026 out.push(10);
5027 out.extend_from_slice(&t.to_le_bytes());
5028 }
5029 Value::Interval { .. } => {
5033 unreachable!(
5034 "Value::Interval has no on-disk encoding; engine must reject it before write"
5035 )
5036 }
5037 }
5038}
5039
5040fn write_u16(out: &mut Vec<u8>, n: u16) {
5041 out.extend_from_slice(&n.to_le_bytes());
5042}
5043fn write_u32(out: &mut Vec<u8>, n: u32) {
5044 out.extend_from_slice(&n.to_le_bytes());
5045}
5046fn write_str(out: &mut Vec<u8>, s: &str) {
5047 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
5048 write_u16(out, len);
5049 out.extend_from_slice(s.as_bytes());
5050}
5051
5052fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
5056 match key {
5057 IndexKey::Int(n) => {
5058 out.push(INDEX_KEY_TAG_INT);
5059 out.extend_from_slice(&n.to_le_bytes());
5060 }
5061 IndexKey::Text(s) => {
5062 out.push(INDEX_KEY_TAG_TEXT);
5063 write_str(out, s);
5064 }
5065 IndexKey::Bool(b) => {
5066 out.push(INDEX_KEY_TAG_BOOL);
5067 out.push(u8::from(*b));
5068 }
5069 }
5070}
5071
5072struct Cursor<'a> {
5073 buf: &'a [u8],
5074 pos: usize,
5075}
5076
5077impl<'a> Cursor<'a> {
5078 const fn new(buf: &'a [u8]) -> Self {
5079 Self { buf, pos: 0 }
5080 }
5081
5082 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5083 let end = self
5084 .pos
5085 .checked_add(n)
5086 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5087 if end > self.buf.len() {
5088 return Err(StorageError::Corrupt(format!(
5089 "unexpected EOF at offset {} (wanted {n} more bytes)",
5090 self.pos
5091 )));
5092 }
5093 let s = &self.buf[self.pos..end];
5094 self.pos = end;
5095 Ok(s)
5096 }
5097
5098 fn read_u8(&mut self) -> Result<u8, StorageError> {
5099 Ok(self.take(1)?[0])
5100 }
5101 fn read_u16(&mut self) -> Result<u16, StorageError> {
5102 let s = self.take(2)?;
5103 Ok(u16::from_le_bytes([s[0], s[1]]))
5104 }
5105 fn read_u32(&mut self) -> Result<u32, StorageError> {
5106 let s = self.take(4)?;
5107 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5108 }
5109 fn read_i32(&mut self) -> Result<i32, StorageError> {
5110 let s = self.take(4)?;
5111 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5112 }
5113 fn read_u64(&mut self) -> Result<u64, StorageError> {
5116 let s = self.take(8)?;
5117 Ok(u64::from_le_bytes([
5118 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
5119 ]))
5120 }
5121 fn read_i64(&mut self) -> Result<i64, StorageError> {
5122 let s = self.take(8)?;
5123 let arr: [u8; 8] = s.try_into().expect("checked");
5124 Ok(i64::from_le_bytes(arr))
5125 }
5126 fn read_f64(&mut self) -> Result<f64, StorageError> {
5127 let s = self.take(8)?;
5128 let arr: [u8; 8] = s.try_into().expect("checked");
5129 Ok(f64::from_le_bytes(arr))
5130 }
5131 fn read_f32(&mut self) -> Result<f32, StorageError> {
5132 let s = self.take(4)?;
5133 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5134 }
5135 fn read_str(&mut self) -> Result<String, StorageError> {
5136 let len = self.read_u16()? as usize;
5137 let bytes = self.take(len)?;
5138 core::str::from_utf8(bytes)
5139 .map(String::from)
5140 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
5141 }
5142
5143 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
5147 let tag = self.read_u8()?;
5148 match tag {
5149 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
5150 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
5151 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
5152 other => Err(StorageError::Corrupt(format!(
5153 "unknown index key tag: {other}"
5154 ))),
5155 }
5156 }
5157 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
5163 match ty {
5164 DataType::SmallInt => {
5165 let s = self.take(2)?;
5166 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5167 }
5168 DataType::Int => Ok(Value::Int(self.read_i32()?)),
5169 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
5170 DataType::Float => Ok(Value::Float(self.read_f64()?)),
5171 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
5172 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
5173 Ok(Value::Text(self.read_str()?))
5174 }
5175 DataType::Vector {
5176 encoding: VecEncoding::F32,
5177 ..
5178 } => {
5179 let dim = self.read_u32()? as usize;
5180 let mut v = Vec::with_capacity(dim);
5181 for _ in 0..dim {
5182 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5183 v.push(f32::from_le_bytes(bytes));
5184 }
5185 Ok(Value::Vector(v))
5186 }
5187 DataType::Vector {
5188 encoding: VecEncoding::Sq8,
5189 ..
5190 } => {
5191 let dim = self.read_u32()? as usize;
5192 let min = self.read_f32()?;
5193 let max = self.read_f32()?;
5194 let bytes = self.take(dim)?.to_vec();
5195 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5196 }
5197 DataType::Vector {
5198 encoding: VecEncoding::F16,
5199 ..
5200 } => {
5201 let dim = self.read_u32()? as usize;
5202 let bytes = self.take(dim * 2)?.to_vec();
5203 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5204 }
5205 DataType::Numeric { .. } => {
5206 let s = self.take(16)?;
5207 let arr: [u8; 16] = s.try_into().expect("checked");
5208 let scaled = i128::from_le_bytes(arr);
5209 let scale = self.read_u8()?;
5210 Ok(Value::Numeric { scaled, scale })
5211 }
5212 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5213 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5214 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5215 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5216 DataType::Interval => {
5217 Err(StorageError::Corrupt(
5222 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5223 ))
5224 }
5225 DataType::Json => Ok(Value::Json(self.read_str()?)),
5226 }
5227 }
5228
5229 fn read_value(&mut self) -> Result<Value, StorageError> {
5230 let tag = self.read_u8()?;
5231 match tag {
5232 0 => Ok(Value::Null),
5233 1 => Ok(Value::Int(self.read_i32()?)),
5234 2 => Ok(Value::BigInt(self.read_i64()?)),
5235 3 => Ok(Value::Float(self.read_f64()?)),
5236 4 => Ok(Value::Text(self.read_str()?)),
5237 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5238 6 => {
5239 let dim = self.read_u32()? as usize;
5240 let mut v = Vec::with_capacity(dim);
5241 for _ in 0..dim {
5242 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5243 v.push(f32::from_le_bytes(bytes));
5244 }
5245 Ok(Value::Vector(v))
5246 }
5247 7 => {
5248 let s = self.take(2)?;
5249 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5250 }
5251 8 => {
5252 let s = self.take(16)?;
5253 let arr: [u8; 16] = s.try_into().expect("checked");
5254 let scaled = i128::from_le_bytes(arr);
5255 let scale = self.read_u8()?;
5256 Ok(Value::Numeric { scaled, scale })
5257 }
5258 9 => Ok(Value::Date(self.read_i32()?)),
5259 10 => Ok(Value::Timestamp(self.read_i64()?)),
5260 11 => {
5265 let dim = self.read_u32()? as usize;
5266 let min = self.read_f32()?;
5267 let max = self.read_f32()?;
5268 let bytes = self.take(dim)?.to_vec();
5269 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5270 }
5271 12 => {
5274 let dim = self.read_u32()? as usize;
5275 let bytes = self.take(dim * 2)?.to_vec();
5276 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5277 }
5278 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5279 }
5280 }
5281
5282 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5286 let m_max_0 = self.read_u16()? as usize;
5287 let entry_raw = self.read_u32()?;
5288 let entry = if entry_raw == u32::MAX {
5289 None
5290 } else {
5291 Some(entry_raw as usize)
5292 };
5293 let entry_level = self.read_u8()?;
5294 let node_count = self.read_u32()? as usize;
5295 let mut levels: PersistentVec<u8> = PersistentVec::new();
5300 for _ in 0..node_count {
5301 levels.push_mut(self.read_u8()?);
5302 }
5303 let layer_count = self.read_u8()? as usize;
5304 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5305 for _ in 0..layer_count {
5306 let n = self.read_u32()? as usize;
5307 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5308 for _ in 0..n {
5309 let cnt = self.read_u16()? as usize;
5310 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5311 for _ in 0..cnt {
5312 row.push(self.read_u32()?);
5313 }
5314 per_layer.push_mut(row);
5315 }
5316 layers.push(per_layer);
5317 }
5318 Ok(NswGraph {
5319 m,
5320 m_max_0,
5321 entry,
5322 entry_level,
5323 levels,
5324 layers,
5325 })
5326 }
5327}
5328
5329#[cfg(test)]
5330mod tests {
5331 use super::*;
5332 use alloc::string::ToString;
5333 use alloc::vec;
5334
5335 #[cfg(target_arch = "aarch64")]
5336 #[test]
5337 fn neon_l2_matches_scalar() {
5338 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5343 for &d in &dims {
5344 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5345 let mut a = Vec::with_capacity(d);
5346 let mut b = Vec::with_capacity(d);
5347 for _ in 0..d {
5348 state = state
5349 .wrapping_mul(6_364_136_223_846_793_005)
5350 .wrapping_add(1);
5351 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5352 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5353 state = state
5354 .wrapping_mul(6_364_136_223_846_793_005)
5355 .wrapping_add(1);
5356 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5357 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5358 a.push(x);
5359 b.push(y);
5360 }
5361 let scalar = l2_distance_sq_scalar(&a, &b);
5362 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5363 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5364 assert!(
5365 (scalar - neon).abs() <= tol,
5366 "dim={d}: scalar={scalar} neon={neon} diff={}",
5367 (scalar - neon).abs()
5368 );
5369 }
5370 }
5371
5372 #[cfg(target_arch = "aarch64")]
5373 #[test]
5374 fn neon_inner_product_matches_scalar() {
5375 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5379 for &d in &dims {
5380 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5381 let mut a = Vec::with_capacity(d);
5382 let mut b = Vec::with_capacity(d);
5383 for _ in 0..d {
5384 state = state
5385 .wrapping_mul(6_364_136_223_846_793_005)
5386 .wrapping_add(1);
5387 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5388 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5389 state = state
5390 .wrapping_mul(6_364_136_223_846_793_005)
5391 .wrapping_add(1);
5392 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5393 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5394 a.push(x);
5395 b.push(y);
5396 }
5397 let scalar = inner_product_scalar(&a, &b);
5398 let neon = unsafe { inner_product_neon(&a, &b) };
5399 #[allow(clippy::cast_precision_loss)]
5400 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5401 assert!(
5402 (scalar - neon).abs() <= tol,
5403 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5404 (scalar - neon).abs()
5405 );
5406 }
5407 }
5408
5409 #[cfg(target_arch = "aarch64")]
5410 #[allow(clippy::similar_names)]
5411 #[test]
5412 fn neon_cosine_dot_norms_matches_scalar() {
5413 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5414 for &d in &dims {
5415 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5416 let mut a = Vec::with_capacity(d);
5417 let mut b = Vec::with_capacity(d);
5418 for _ in 0..d {
5419 state = state
5420 .wrapping_mul(6_364_136_223_846_793_005)
5421 .wrapping_add(1);
5422 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5423 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5424 state = state
5425 .wrapping_mul(6_364_136_223_846_793_005)
5426 .wrapping_add(1);
5427 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5428 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5429 a.push(x);
5430 b.push(y);
5431 }
5432 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5433 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5434 #[allow(clippy::cast_precision_loss)]
5435 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5436 #[allow(clippy::cast_precision_loss)]
5437 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5438 assert!(
5439 (dot_s - dot_n).abs() <= tol_d,
5440 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5441 );
5442 assert!(
5443 (na_s - na_n).abs() <= tol_n,
5444 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5445 );
5446 assert!(
5447 (nb_s - nb_n).abs() <= tol_n,
5448 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5449 );
5450 }
5451 }
5452
5453 fn make_users_schema() -> TableSchema {
5454 TableSchema::new(
5455 "users",
5456 vec![
5457 ColumnSchema::new("id", DataType::Int, false),
5458 ColumnSchema::new("name", DataType::Text, false),
5459 ColumnSchema::new("score", DataType::Float, true),
5460 ],
5461 )
5462 }
5463
5464 #[test]
5465 fn value_type_tag_matches_variant() {
5466 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5467 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5468 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5469 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5470 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5471 assert_eq!(Value::Null.data_type(), None);
5472 assert!(Value::Null.is_null());
5473 assert!(!Value::Int(0).is_null());
5474 }
5475
5476 #[test]
5477 fn sq8_value_reports_sq8_data_type() {
5478 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5483 let v = Value::Sq8Vector(q);
5484 assert_eq!(
5485 v.data_type(),
5486 Some(DataType::Vector {
5487 dim: 5,
5488 encoding: VecEncoding::Sq8,
5489 }),
5490 );
5491 }
5492
5493 #[test]
5494 fn datatype_display_matches_pg_keyword() {
5495 assert_eq!(DataType::Int.to_string(), "INT");
5496 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5497 assert_eq!(DataType::Float.to_string(), "FLOAT");
5498 assert_eq!(DataType::Text.to_string(), "TEXT");
5499 assert_eq!(DataType::Bool.to_string(), "BOOL");
5500 }
5501
5502 #[test]
5503 fn row_len_and_emptiness() {
5504 let r = Row::new(vec![Value::Int(1), Value::Null]);
5505 assert_eq!(r.len(), 2);
5506 assert!(!r.is_empty());
5507 assert!(Row::new(Vec::new()).is_empty());
5508 }
5509
5510 #[test]
5511 fn table_schema_column_position() {
5512 let s = make_users_schema();
5513 assert_eq!(s.column_position("id"), Some(0));
5514 assert_eq!(s.column_position("score"), Some(2));
5515 assert_eq!(s.column_position("missing"), None);
5516 }
5517
5518 #[test]
5519 fn catalog_create_table_then_lookup() {
5520 let mut cat = Catalog::new();
5521 cat.create_table(make_users_schema()).unwrap();
5522 assert_eq!(cat.table_count(), 1);
5523 assert!(cat.get("users").is_some());
5524 assert!(cat.get("nope").is_none());
5525 }
5526
5527 #[test]
5528 fn catalog_duplicate_table_is_rejected() {
5529 let mut cat = Catalog::new();
5530 cat.create_table(make_users_schema()).unwrap();
5531 let err = cat.create_table(make_users_schema()).unwrap_err();
5532 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5533 }
5534
5535 #[test]
5536 fn table_insert_happy_path_appends_row() {
5537 let mut cat = Catalog::new();
5538 cat.create_table(make_users_schema()).unwrap();
5539 let t = cat.get_mut("users").unwrap();
5540 t.insert(Row::new(vec![
5541 Value::Int(1),
5542 Value::Text("alice".into()),
5543 Value::Float(99.5),
5544 ]))
5545 .unwrap();
5546 assert_eq!(t.row_count(), 1);
5547 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5548 }
5549
5550 #[test]
5551 fn table_insert_arity_mismatch() {
5552 let mut cat = Catalog::new();
5553 cat.create_table(make_users_schema()).unwrap();
5554 let t = cat.get_mut("users").unwrap();
5555 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5556 assert!(matches!(
5557 err,
5558 StorageError::ArityMismatch {
5559 expected: 3,
5560 actual: 1
5561 }
5562 ));
5563 assert_eq!(t.row_count(), 0);
5564 }
5565
5566 #[test]
5567 fn table_insert_type_mismatch_reports_column() {
5568 let mut cat = Catalog::new();
5569 cat.create_table(make_users_schema()).unwrap();
5570 let t = cat.get_mut("users").unwrap();
5571 let err = t
5572 .insert(Row::new(vec![
5573 Value::Int(1),
5574 Value::Int(42), Value::Float(0.0),
5576 ]))
5577 .unwrap_err();
5578 match err {
5579 StorageError::TypeMismatch {
5580 ref column,
5581 expected,
5582 actual,
5583 position,
5584 } => {
5585 assert_eq!(column, "name");
5586 assert_eq!(expected, DataType::Text);
5587 assert_eq!(actual, DataType::Int);
5588 assert_eq!(position, 1);
5589 }
5590 other => panic!("unexpected: {other:?}"),
5591 }
5592 assert_eq!(t.row_count(), 0);
5593 }
5594
5595 #[test]
5596 fn table_insert_null_into_not_null_rejected() {
5597 let mut cat = Catalog::new();
5598 cat.create_table(make_users_schema()).unwrap();
5599 let t = cat.get_mut("users").unwrap();
5600 let err = t
5601 .insert(Row::new(vec![
5602 Value::Int(1),
5603 Value::Null, Value::Float(1.0),
5605 ]))
5606 .unwrap_err();
5607 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5608 }
5609
5610 #[test]
5611 fn table_insert_null_into_nullable_ok() {
5612 let mut cat = Catalog::new();
5613 cat.create_table(make_users_schema()).unwrap();
5614 let t = cat.get_mut("users").unwrap();
5615 t.insert(Row::new(vec![
5616 Value::Int(1),
5617 Value::Text("bob".into()),
5618 Value::Null,
5619 ]))
5620 .unwrap();
5621 assert_eq!(t.row_count(), 1);
5622 }
5623
5624 #[test]
5625 fn catalog_get_mut_independent_per_table() {
5626 let mut cat = Catalog::new();
5627 cat.create_table(TableSchema::new(
5628 "a",
5629 vec![ColumnSchema::new("v", DataType::Int, false)],
5630 ))
5631 .unwrap();
5632 cat.create_table(TableSchema::new(
5633 "b",
5634 vec![ColumnSchema::new("v", DataType::Int, false)],
5635 ))
5636 .unwrap();
5637 cat.get_mut("a")
5638 .unwrap()
5639 .insert(Row::new(vec![Value::Int(1)]))
5640 .unwrap();
5641 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5642 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5643 }
5644
5645 fn assert_round_trip(cat: &Catalog) {
5648 let bytes = cat.serialize();
5649 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5650 assert_eq!(restored.table_count(), cat.table_count());
5653 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5654 assert_eq!(a.schema, b.schema);
5655 assert_eq!(a.rows, b.rows);
5656 }
5657 }
5658
5659 #[test]
5660 fn serialize_empty_catalog_round_trips() {
5661 assert_round_trip(&Catalog::new());
5662 }
5663
5664 #[test]
5665 fn serialize_single_empty_table_round_trips() {
5666 let mut cat = Catalog::new();
5667 cat.create_table(make_users_schema()).unwrap();
5668 assert_round_trip(&cat);
5669 }
5670
5671 #[test]
5672 fn nsw_clone_is_o1() {
5673 let mut cat = Catalog::new();
5682 cat.create_table(TableSchema::new(
5683 "docs",
5684 alloc::vec![
5685 ColumnSchema::new("id", DataType::Int, false),
5686 ColumnSchema::new(
5687 "v",
5688 DataType::Vector {
5689 dim: 3,
5690 encoding: VecEncoding::F32
5691 },
5692 true
5693 ),
5694 ],
5695 ))
5696 .unwrap();
5697 let t = cat.get_mut("docs").unwrap();
5698 for i in 0..1500_i32 {
5699 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5701 t.insert(Row::new(alloc::vec![
5702 Value::Int(i),
5703 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5704 ]))
5705 .unwrap();
5706 }
5707 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5708 .unwrap();
5709 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5710 IndexKind::Nsw(g) => g,
5711 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5712 };
5713 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5716 assert!(
5717 g.layers.len() >= 2,
5718 "1500 nodes should populate at least two HNSW layers, got {}",
5719 g.layers.len()
5720 );
5721
5722 let cloned = g.clone();
5723
5724 assert!(
5725 g.levels.shares_storage_with(&cloned.levels),
5726 "levels PV not shared after clone — clone copied elements (O(N))"
5727 );
5728 assert_eq!(g.layers.len(), cloned.layers.len());
5729 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5730 assert!(
5731 orig.shares_storage_with(cl),
5732 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5733 );
5734 }
5735 }
5736
5737 #[test]
5738 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5739 let mut cat = Catalog::new();
5746 cat.create_table(TableSchema::new(
5747 "vecs",
5748 alloc::vec![
5749 ColumnSchema::new("id", DataType::Int, false),
5750 ColumnSchema::new(
5751 "v",
5752 DataType::Vector {
5753 dim: 8,
5754 encoding: VecEncoding::Sq8,
5755 },
5756 false,
5757 ),
5758 ],
5759 ))
5760 .unwrap();
5761 let t = cat.get_mut("vecs").unwrap();
5762 for i in 0..32_i32 {
5763 #[allow(clippy::cast_precision_loss)]
5764 let base = (i as f32) * 0.03;
5765 let v: Vec<f32> = (0..8_i32)
5766 .map(|j| {
5767 #[allow(clippy::cast_precision_loss)]
5768 let off = (j as f32) * 0.01;
5769 base + off
5770 })
5771 .collect();
5772 t.insert(Row::new(alloc::vec![
5773 Value::Int(i),
5774 Value::Sq8Vector(quantize::quantize(&v)),
5775 ]))
5776 .unwrap();
5777 }
5778 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5779 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5782 let (before_cell, before_ty, before_hits) = {
5783 let t_ref = cat.get("vecs").unwrap();
5784 (
5785 t_ref.rows()[5].values[1].clone(),
5786 t_ref.schema().columns[1].ty,
5787 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5788 )
5789 };
5790
5791 let bytes = cat.serialize();
5792 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5793 let rt = restored.get("vecs").unwrap();
5794 assert_eq!(rt.schema().columns[1].ty, before_ty);
5795 assert_eq!(rt.rows()[5].values[1], before_cell);
5796 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5797 assert_eq!(before_hits, after_hits);
5798 }
5799
5800 #[test]
5801 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5802 use crate::halfvec;
5809 let mut cat = Catalog::new();
5810 cat.create_table(TableSchema::new(
5811 "vecs",
5812 alloc::vec![
5813 ColumnSchema::new("id", DataType::Int, false),
5814 ColumnSchema::new(
5815 "v",
5816 DataType::Vector {
5817 dim: 8,
5818 encoding: VecEncoding::F16,
5819 },
5820 false,
5821 ),
5822 ],
5823 ))
5824 .unwrap();
5825 let t = cat.get_mut("vecs").unwrap();
5826 for i in 0..32_i32 {
5827 #[allow(clippy::cast_precision_loss)]
5828 let base = (i as f32) * 0.03;
5829 let v: Vec<f32> = (0..8_i32)
5830 .map(|j| {
5831 #[allow(clippy::cast_precision_loss)]
5832 let off = (j as f32) * 0.01;
5833 base + off
5834 })
5835 .collect();
5836 t.insert(Row::new(alloc::vec![
5837 Value::Int(i),
5838 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
5839 ]))
5840 .unwrap();
5841 }
5842 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5843 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5844 let (before_cell, before_ty, before_hits) = {
5845 let t_ref = cat.get("vecs").unwrap();
5846 (
5847 t_ref.rows()[5].values[1].clone(),
5848 t_ref.schema().columns[1].ty,
5849 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5850 )
5851 };
5852 let bytes = cat.serialize();
5853 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5854 let rt = restored.get("vecs").unwrap();
5855 assert_eq!(rt.schema().columns[1].ty, before_ty);
5856 assert_eq!(rt.rows()[5].values[1], before_cell);
5857 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5858 assert_eq!(before_hits, after_hits);
5859 }
5860
5861 #[test]
5862 #[allow(clippy::similar_names)]
5863 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
5864 use crate::halfvec;
5871 fn next(state: &mut u64) -> f32 {
5872 *state = state
5873 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5874 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5875 #[allow(clippy::cast_precision_loss)]
5876 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5877 2.0 * u - 1.0
5878 }
5879 let dim: u32 = 32;
5880 let n: usize = 512;
5881 let dim_us = dim as usize;
5882 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
5883 let corpus: Vec<Vec<f32>> = (0..n)
5884 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5885 .collect();
5886 let queries: Vec<Vec<f32>> = (0..32)
5887 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5888 .collect();
5889 let exact_top10: Vec<Vec<usize>> = queries
5890 .iter()
5891 .map(|q| {
5892 let mut scored: Vec<(f32, usize)> = corpus
5893 .iter()
5894 .enumerate()
5895 .map(|(i, v)| (l2_distance_sq(v, q), i))
5896 .collect();
5897 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5898 scored.into_iter().take(10).map(|(_, i)| i).collect()
5899 })
5900 .collect();
5901 let mut cat = Catalog::new();
5902 cat.create_table(TableSchema::new(
5903 "vecs",
5904 alloc::vec![
5905 ColumnSchema::new("id", DataType::Int, false),
5906 ColumnSchema::new(
5907 "v",
5908 DataType::Vector {
5909 dim,
5910 encoding: VecEncoding::F16,
5911 },
5912 false,
5913 ),
5914 ],
5915 ))
5916 .unwrap();
5917 let t = cat.get_mut("vecs").unwrap();
5918 for (i, v) in corpus.iter().enumerate() {
5919 t.insert(Row::new(alloc::vec![
5920 Value::Int(i32::try_from(i).unwrap()),
5921 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
5922 ]))
5923 .unwrap();
5924 }
5925 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5926 let table = cat.get("vecs").unwrap();
5927 let mut total_overlap = 0_usize;
5928 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5929 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5930 for h in &hits {
5931 if exact.contains(h) {
5932 total_overlap += 1;
5933 }
5934 }
5935 }
5936 #[allow(clippy::cast_precision_loss)]
5937 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5938 assert!(
5939 recall >= 0.95,
5940 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5941 check halfvec dispatch in `cell_to_query_metric_distance`"
5942 );
5943 }
5944
5945 #[test]
5946 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
5947 use crate::quantize;
5954 fn next(state: &mut u64) -> f32 {
5958 *state = state
5959 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5960 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5961 #[allow(clippy::cast_precision_loss)]
5962 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5963 2.0 * u - 1.0
5964 }
5965 let dim: u32 = 32;
5966 let n: usize = 512;
5967 let dim_us = dim as usize;
5968 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
5969 let corpus: Vec<Vec<f32>> = (0..n)
5970 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5971 .collect();
5972 let queries: Vec<Vec<f32>> = (0..32)
5973 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5974 .collect();
5975 let exact_top10: Vec<Vec<usize>> = queries
5977 .iter()
5978 .map(|q| {
5979 let mut scored: Vec<(f32, usize)> = corpus
5980 .iter()
5981 .enumerate()
5982 .map(|(i, v)| (l2_distance_sq(v, q), i))
5983 .collect();
5984 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5985 scored.into_iter().take(10).map(|(_, i)| i).collect()
5986 })
5987 .collect();
5988 let mut cat = Catalog::new();
5991 cat.create_table(TableSchema::new(
5992 "vecs",
5993 alloc::vec![
5994 ColumnSchema::new("id", DataType::Int, false),
5995 ColumnSchema::new(
5996 "v",
5997 DataType::Vector {
5998 dim,
5999 encoding: VecEncoding::Sq8,
6000 },
6001 false,
6002 ),
6003 ],
6004 ))
6005 .unwrap();
6006 let t = cat.get_mut("vecs").unwrap();
6007 for (i, v) in corpus.iter().enumerate() {
6008 t.insert(Row::new(alloc::vec![
6009 Value::Int(i32::try_from(i).unwrap()),
6010 Value::Sq8Vector(quantize::quantize(v)),
6011 ]))
6012 .unwrap();
6013 }
6014 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6015 let table = cat.get("vecs").unwrap();
6016 let mut total_overlap = 0_usize;
6017 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6018 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6019 for h in &hits {
6020 if exact.contains(h) {
6021 total_overlap += 1;
6022 }
6023 }
6024 }
6025 #[allow(clippy::cast_precision_loss)]
6026 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6027 assert!(
6028 recall >= 0.95,
6029 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6030 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
6031 );
6032 }
6033
6034 #[test]
6035 fn nsw_index_topology_persists_through_round_trip() {
6036 let mut cat = Catalog::new();
6042 cat.create_table(TableSchema::new(
6043 "docs",
6044 alloc::vec![
6045 ColumnSchema::new("id", DataType::Int, false),
6046 ColumnSchema::new(
6047 "v",
6048 DataType::Vector {
6049 dim: 3,
6050 encoding: VecEncoding::F32
6051 },
6052 true
6053 ),
6054 ],
6055 ))
6056 .unwrap();
6057 let t = cat.get_mut("docs").unwrap();
6058 for i in 0..6_i32 {
6059 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
6061 let row = Row::new(alloc::vec![
6062 Value::Int(i),
6063 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6064 ]);
6065 t.insert(row).unwrap();
6066 }
6067 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6068 .unwrap();
6069 let original = match &cat.get("docs").unwrap().indices()[0].kind {
6070 IndexKind::Nsw(g) => g.clone(),
6071 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6072 };
6073 let bytes = cat.serialize();
6074 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6075 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
6076 IndexKind::Nsw(g) => g.clone(),
6077 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6078 };
6079 assert_eq!(restored_graph.m, original.m);
6080 assert_eq!(restored_graph.m_max_0, original.m_max_0);
6081 assert_eq!(restored_graph.entry, original.entry);
6082 assert_eq!(restored_graph.entry_level, original.entry_level);
6083 assert_eq!(restored_graph.levels, original.levels);
6084 assert_eq!(restored_graph.layers, original.layers);
6085 }
6086
6087 #[test]
6088 fn hnsw_level_assignment_is_deterministic() {
6089 for i in 0..32usize {
6092 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
6093 }
6094 }
6095
6096 #[test]
6097 fn hnsw_layer_0_dominates_population() {
6098 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
6103 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
6104 }
6105
6106 #[test]
6107 fn hnsw_search_matches_brute_force_for_l2_top1() {
6108 let mut cat = Catalog::new();
6112 cat.create_table(TableSchema::new(
6113 "vecs",
6114 alloc::vec![
6115 ColumnSchema::new("id", DataType::Int, false),
6116 ColumnSchema::new(
6117 "v",
6118 DataType::Vector {
6119 dim: 3,
6120 encoding: VecEncoding::F32
6121 },
6122 true
6123 ),
6124 ],
6125 ))
6126 .unwrap();
6127 let t = cat.get_mut("vecs").unwrap();
6128 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
6129 (1, [0.0, 0.0, 0.0]),
6130 (2, [1.0, 0.0, 0.0]),
6131 (3, [0.0, 1.0, 0.0]),
6132 (4, [0.0, 0.0, 1.0]),
6133 (5, [1.0, 1.0, 0.0]),
6134 (6, [1.0, 0.0, 1.0]),
6135 (7, [0.0, 1.0, 1.0]),
6136 (8, [1.0, 1.0, 1.0]),
6137 (9, [0.5, 0.5, 0.5]),
6138 (10, [0.2, 0.8, 0.5]),
6139 ];
6140 for &(id, v) in &dataset {
6141 t.insert(Row::new(alloc::vec![
6142 Value::Int(id),
6143 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
6144 ]))
6145 .unwrap();
6146 }
6147 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6148 let idx_pos = cat
6149 .get("vecs")
6150 .unwrap()
6151 .indices()
6152 .iter()
6153 .position(|i| i.name == "v_idx")
6154 .unwrap();
6155 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
6156 let table = cat.get("vecs").unwrap();
6157 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
6158 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
6159 .map(|i| {
6160 let Value::Vector(v) = &table.rows[i].values[1] else {
6161 return (f32::INFINITY, i);
6162 };
6163 (l2_distance_sq(v, &query), i)
6164 })
6165 .collect();
6166 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6167 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
6168 assert_eq!(
6169 hnsw_top[0].1, brute[0].1,
6170 "HNSW top-1 != brute-force top-1 for {query:?}"
6171 );
6172 }
6173 }
6174
6175 #[test]
6176 fn serialize_table_with_rows_round_trips() {
6177 let mut cat = Catalog::new();
6178 cat.create_table(make_users_schema()).unwrap();
6179 let t = cat.get_mut("users").unwrap();
6180 t.insert(Row::new(vec![
6181 Value::Int(1),
6182 Value::Text("alice".into()),
6183 Value::Float(95.5),
6184 ]))
6185 .unwrap();
6186 t.insert(Row::new(vec![
6187 Value::Int(2),
6188 Value::Text("bob".into()),
6189 Value::Null,
6190 ]))
6191 .unwrap();
6192 assert_round_trip(&cat);
6193 }
6194
6195 #[test]
6196 fn serialize_multiple_tables_round_trips() {
6197 let mut cat = Catalog::new();
6198 cat.create_table(make_users_schema()).unwrap();
6199 cat.create_table(TableSchema::new(
6200 "flags",
6201 vec![
6202 ColumnSchema::new("id", DataType::BigInt, false),
6203 ColumnSchema::new("active", DataType::Bool, false),
6204 ],
6205 ))
6206 .unwrap();
6207 cat.get_mut("flags")
6208 .unwrap()
6209 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6210 .unwrap();
6211 assert_round_trip(&cat);
6212 }
6213
6214 #[test]
6215 fn deserialize_rejects_bad_magic() {
6216 let mut buf = b"BADMAGIC".to_vec();
6217 buf.push(FILE_VERSION);
6218 buf.extend_from_slice(&0u32.to_le_bytes());
6219 let err = Catalog::deserialize(&buf).unwrap_err();
6220 assert!(matches!(err, StorageError::Corrupt(_)));
6221 }
6222
6223 #[test]
6224 fn deserialize_rejects_unsupported_version() {
6225 let mut buf = FILE_MAGIC.to_vec();
6226 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6228 let err = Catalog::deserialize(&buf).unwrap_err();
6229 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6230 }
6231
6232 #[test]
6233 fn deserialize_rejects_truncated_file() {
6234 let mut cat = Catalog::new();
6235 cat.create_table(make_users_schema()).unwrap();
6236 let bytes = cat.serialize();
6237 let truncated = &bytes[..bytes.len() - 1];
6239 assert!(matches!(
6240 Catalog::deserialize(truncated),
6241 Err(StorageError::Corrupt(_))
6242 ));
6243 }
6244
6245 #[test]
6246 fn deserialize_rejects_trailing_garbage() {
6247 let cat = Catalog::new();
6248 let mut bytes = cat.serialize();
6249 bytes.push(0xFF);
6250 assert!(matches!(
6251 Catalog::deserialize(&bytes),
6252 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6253 ));
6254 }
6255
6256 fn populated_users() -> Catalog {
6259 let mut cat = Catalog::new();
6260 cat.create_table(make_users_schema()).unwrap();
6261 let t = cat.get_mut("users").unwrap();
6262 for (id, name, score) in [
6263 (1, "alice", Some(90.0)),
6264 (2, "bob", None),
6265 (3, "alice", Some(70.0)), ] {
6267 t.insert(Row::new(vec![
6268 Value::Int(id),
6269 Value::Text(name.into()),
6270 score.map_or(Value::Null, Value::Float),
6271 ]))
6272 .unwrap();
6273 }
6274 cat
6275 }
6276
6277 #[test]
6278 fn add_index_builds_from_existing_rows() {
6279 let mut cat = populated_users();
6280 cat.get_mut("users")
6281 .unwrap()
6282 .add_index("by_id".into(), "id")
6283 .unwrap();
6284 let t = cat.get("users").unwrap();
6285 let idx = t.index_on(0).expect("index_on(0)");
6286 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6287 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6288 }
6289
6290 #[test]
6291 fn add_index_dup_name_rejected() {
6292 let mut cat = populated_users();
6293 let t = cat.get_mut("users").unwrap();
6294 t.add_index("ix".into(), "id").unwrap();
6295 let err = t.add_index("ix".into(), "name").unwrap_err();
6296 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6297 }
6298
6299 #[test]
6300 fn add_index_unknown_column_rejected() {
6301 let mut cat = populated_users();
6302 let err = cat
6303 .get_mut("users")
6304 .unwrap()
6305 .add_index("ix".into(), "ghost")
6306 .unwrap_err();
6307 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6308 }
6309
6310 #[test]
6311 fn insert_after_create_index_updates_it() {
6312 let mut cat = populated_users();
6313 let t = cat.get_mut("users").unwrap();
6314 t.add_index("by_name".into(), "name").unwrap();
6315 t.insert(Row::new(vec![
6316 Value::Int(4),
6317 Value::Text("dave".into()),
6318 Value::Null,
6319 ]))
6320 .unwrap();
6321 let idx = t.index_on(1).unwrap();
6322 assert_eq!(
6323 idx.lookup_eq(&IndexKey::Text("dave".into())),
6324 &[RowLocator::Hot(3)]
6325 );
6326 assert_eq!(
6328 idx.lookup_eq(&IndexKey::Text("alice".into())),
6329 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6330 );
6331 }
6332
6333 #[test]
6334 fn null_or_float_values_are_not_indexed() {
6335 let mut cat = populated_users();
6336 let t = cat.get_mut("users").unwrap();
6337 t.add_index("by_score".into(), "score").unwrap();
6338 let idx = t.index_on(2).unwrap();
6339 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6344 }
6345
6346 #[test]
6349 fn vector_value_data_type_carries_dim() {
6350 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6351 assert_eq!(
6352 v.data_type(),
6353 Some(DataType::Vector {
6354 dim: 3,
6355 encoding: VecEncoding::F32
6356 })
6357 );
6358 }
6359
6360 #[test]
6361 fn vector_column_insert_matching_dim_ok() {
6362 let mut cat = Catalog::new();
6363 cat.create_table(TableSchema::new(
6364 "emb",
6365 vec![ColumnSchema::new(
6366 "v",
6367 DataType::Vector {
6368 dim: 3,
6369 encoding: VecEncoding::F32,
6370 },
6371 false,
6372 )],
6373 ))
6374 .unwrap();
6375 cat.get_mut("emb")
6376 .unwrap()
6377 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6378 .unwrap();
6379 }
6380
6381 #[test]
6382 fn vector_column_insert_dim_mismatch_rejected() {
6383 let mut cat = Catalog::new();
6384 cat.create_table(TableSchema::new(
6385 "emb",
6386 vec![ColumnSchema::new(
6387 "v",
6388 DataType::Vector {
6389 dim: 3,
6390 encoding: VecEncoding::F32,
6391 },
6392 false,
6393 )],
6394 ))
6395 .unwrap();
6396 let err = cat
6397 .get_mut("emb")
6398 .unwrap()
6399 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6400 .unwrap_err();
6401 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6402 }
6403
6404 #[test]
6405 fn vector_value_survives_catalog_round_trip() {
6406 let mut cat = Catalog::new();
6407 cat.create_table(TableSchema::new(
6408 "emb",
6409 vec![
6410 ColumnSchema::new("id", DataType::Int, false),
6411 ColumnSchema::new(
6412 "v",
6413 DataType::Vector {
6414 dim: 4,
6415 encoding: VecEncoding::F32,
6416 },
6417 false,
6418 ),
6419 ],
6420 ))
6421 .unwrap();
6422 cat.get_mut("emb")
6423 .unwrap()
6424 .insert(Row::new(vec![
6425 Value::Int(1),
6426 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6427 ]))
6428 .unwrap();
6429 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6430 let table = restored.get("emb").unwrap();
6431 assert_eq!(
6432 table.schema().columns[1].ty,
6433 DataType::Vector {
6434 dim: 4,
6435 encoding: VecEncoding::F32
6436 }
6437 );
6438 assert_eq!(
6439 table.rows()[0].values[1],
6440 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6441 );
6442 }
6443
6444 #[test]
6445 fn index_survives_serialize_deserialize_round_trip() {
6446 let mut cat = populated_users();
6447 cat.get_mut("users")
6448 .unwrap()
6449 .add_index("by_name".into(), "name")
6450 .unwrap();
6451 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6452 let idx = restored
6453 .get("users")
6454 .unwrap()
6455 .index_on(1)
6456 .expect("index_on(1) after restore");
6457 assert_eq!(idx.name, "by_name");
6458 assert_eq!(
6460 idx.lookup_eq(&IndexKey::Text("alice".into())),
6461 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6462 );
6463 }
6464
6465 fn bigint_pk_users_schema() -> TableSchema {
6470 TableSchema::new(
6471 "users",
6472 vec![
6473 ColumnSchema::new("id", DataType::BigInt, false),
6474 ColumnSchema::new("name", DataType::Text, false),
6475 ],
6476 )
6477 }
6478
6479 fn make_user_row(id: i64, name: &str) -> Row {
6480 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6481 }
6482
6483 #[test]
6484 fn lookup_by_pk_finds_row_via_hot_index() {
6485 let mut cat = Catalog::new();
6486 cat.create_table(bigint_pk_users_schema()).unwrap();
6487 let t = cat.get_mut("users").unwrap();
6488 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6489 t.insert(make_user_row(id, name)).unwrap();
6490 }
6491 t.add_index("by_id".into(), "id").unwrap();
6492 let got = cat
6494 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6495 .unwrap();
6496 assert_eq!(got, make_user_row(2, "bob"));
6497 assert_eq!(cat.cold_segment_count(), 0);
6498 }
6499
6500 #[test]
6501 fn lookup_by_pk_returns_none_when_key_missing() {
6502 let mut cat = Catalog::new();
6503 cat.create_table(bigint_pk_users_schema()).unwrap();
6504 let t = cat.get_mut("users").unwrap();
6505 t.insert(make_user_row(1, "alice")).unwrap();
6506 t.add_index("by_id".into(), "id").unwrap();
6507 assert!(
6508 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6509 .is_none()
6510 );
6511 assert!(
6513 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6514 .is_none()
6515 );
6516 assert!(
6517 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6518 .is_none()
6519 );
6520 }
6521
6522 #[test]
6523 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6524 let mut cat = Catalog::new();
6528 cat.create_table(bigint_pk_users_schema()).unwrap();
6529 let t = cat.get_mut("users").unwrap();
6530 t.add_index("by_id".into(), "id").unwrap();
6531 let schema = t.schema.clone();
6532
6533 let cold_rows: Vec<(i64, &str)> =
6534 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6535 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6536 .iter()
6537 .map(|(id, name)| {
6538 let row = make_user_row(*id, name);
6539 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6540 })
6541 .collect();
6542 let (seg_bytes, _meta) =
6543 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6544 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6545 assert_eq!(seg_id, 0);
6546 assert_eq!(cat.cold_segment_count(), 1);
6547
6548 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6549 .iter()
6550 .map(|(id, _)| {
6551 (
6552 IndexKey::Int(*id),
6553 RowLocator::Cold {
6554 segment_id: seg_id,
6555 page_offset: 0,
6556 },
6557 )
6558 })
6559 .collect();
6560 let registered = cat
6561 .get_mut("users")
6562 .unwrap()
6563 .register_cold_locators("by_id", pairs)
6564 .unwrap();
6565 assert_eq!(registered, 4);
6566
6567 for (id, name) in &cold_rows {
6568 let got = cat
6569 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6570 .unwrap_or_else(|| panic!("cold key {id} not found"));
6571 assert_eq!(got, make_user_row(*id, name));
6572 }
6573 assert!(
6575 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6576 .is_none()
6577 );
6578 }
6579
6580 #[test]
6581 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6582 let mut cat = Catalog::new();
6586 cat.create_table(bigint_pk_users_schema()).unwrap();
6587 let t = cat.get_mut("users").unwrap();
6588 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6589 t.insert(make_user_row(id, name)).unwrap();
6590 }
6591 t.add_index("by_id".into(), "id").unwrap();
6592 let schema = t.schema.clone();
6593
6594 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6595 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6596 .iter()
6597 .map(|(id, name)| {
6598 let row = make_user_row(*id, name);
6599 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6600 })
6601 .collect();
6602 let (seg_bytes, _) =
6603 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6604 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6605 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6606 .iter()
6607 .map(|(id, _)| {
6608 (
6609 IndexKey::Int(*id),
6610 RowLocator::Cold {
6611 segment_id: seg_id,
6612 page_offset: 0,
6613 },
6614 )
6615 })
6616 .collect();
6617 cat.get_mut("users")
6618 .unwrap()
6619 .register_cold_locators("by_id", pairs)
6620 .unwrap();
6621
6622 assert_eq!(
6624 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6625 .unwrap(),
6626 make_user_row(1, "alice")
6627 );
6628 assert_eq!(
6629 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6630 .unwrap(),
6631 make_user_row(2, "bob")
6632 );
6633 assert_eq!(
6635 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6636 .unwrap(),
6637 make_user_row(100, "ivy")
6638 );
6639 assert_eq!(
6640 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6641 .unwrap(),
6642 make_user_row(200, "joe")
6643 );
6644 assert!(
6646 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6647 .is_none()
6648 );
6649 }
6650
6651 #[test]
6652 fn register_cold_locators_rejects_nsw_index() {
6653 let mut cat = Catalog::new();
6654 cat.create_table(TableSchema::new(
6655 "vecs",
6656 vec![
6657 ColumnSchema::new("id", DataType::Int, false),
6658 ColumnSchema::new(
6659 "v",
6660 DataType::Vector {
6661 dim: 4,
6662 encoding: VecEncoding::F32,
6663 },
6664 false,
6665 ),
6666 ],
6667 ))
6668 .unwrap();
6669 let t = cat.get_mut("vecs").unwrap();
6670 t.insert(Row::new(vec![
6671 Value::Int(1),
6672 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6673 ]))
6674 .unwrap();
6675 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6676 let err = t
6677 .register_cold_locators(
6678 "by_v",
6679 vec![(
6680 IndexKey::Int(1),
6681 RowLocator::Cold {
6682 segment_id: 0,
6683 page_offset: 0,
6684 },
6685 )],
6686 )
6687 .unwrap_err();
6688 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6691 }
6692
6693 #[test]
6694 fn load_segment_bytes_rejects_garbage() {
6695 let mut cat = Catalog::new();
6696 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6697 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6698 assert_eq!(cat.cold_segment_count(), 0);
6700 }
6701
6702 #[test]
6703 fn load_segment_bytes_returns_sequential_ids() {
6704 let mut cat = Catalog::new();
6705 cat.create_table(bigint_pk_users_schema()).unwrap();
6706 let schema = cat.get("users").unwrap().schema.clone();
6707 for batch in 0u32..3 {
6708 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6709 .map(|i| {
6710 let id = u64::from(batch) * 100 + i;
6711 let row = make_user_row(id.cast_signed(), "x");
6712 (id, encode_row_body_dense(&row, &schema))
6713 })
6714 .collect();
6715 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6716 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6717 }
6718 assert_eq!(cat.cold_segment_count(), 3);
6719 }
6720
6721 #[test]
6728 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6729 let mut cat = populated_users();
6736 cat.get_mut("users")
6737 .unwrap()
6738 .add_index("by_name".into(), "name")
6739 .unwrap();
6740
6741 let v8_bytes = encode_as_v8(&cat);
6746 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6747
6748 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6749 let idx = restored
6750 .get("users")
6751 .unwrap()
6752 .index_on(1)
6753 .expect("index_on(1) after restore");
6754 assert_eq!(
6757 idx.lookup_eq(&IndexKey::Text("alice".into())),
6758 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6759 );
6760 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6762 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6763 }
6764 }
6765
6766 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6771 let mut out = Vec::with_capacity(64);
6772 out.extend_from_slice(FILE_MAGIC);
6773 out.push(8u8);
6774 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6775 for t in &cat.tables {
6776 write_str(&mut out, &t.schema.name);
6777 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6778 for c in &t.schema.columns {
6779 write_str(&mut out, &c.name);
6780 write_data_type(&mut out, c.ty);
6781 out.push(u8::from(c.nullable));
6782 match &c.default {
6783 None => out.push(0),
6784 Some(v) => {
6785 out.push(1);
6786 write_value(&mut out, v);
6787 }
6788 }
6789 out.push(u8::from(c.auto_increment));
6790 }
6791 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6792 for row in &t.rows {
6793 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6794 }
6795 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6796 for idx in &t.indices {
6797 write_str(&mut out, &idx.name);
6798 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6799 match &idx.kind {
6800 IndexKind::BTree(_) => out.push(0),
6803 IndexKind::Nsw(g) => {
6804 out.push(1);
6805 write_u16(&mut out, u16::try_from(g.m).unwrap());
6806 write_nsw_graph(&mut out, g);
6807 }
6808 IndexKind::Brin { .. } => panic!(
6811 "v8 catalog writer cannot serialise BRIN — \
6812 tests with BRIN indices must use the current writer"
6813 ),
6814 }
6815 }
6816 }
6817 out
6818 }
6819
6820 #[test]
6826 fn v9_catalog_round_trip_preserves_cold_locators() {
6827 let mut cat = Catalog::new();
6828 cat.create_table(bigint_pk_users_schema()).unwrap();
6829 let t = cat.get_mut("users").unwrap();
6830 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6832 t.insert(make_user_row(id, name)).unwrap();
6833 }
6834 t.add_index("by_id".into(), "id").unwrap();
6835 let schema = t.schema.clone();
6836
6837 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
6839 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6840 .iter()
6841 .map(|(id, name)| {
6842 let row = make_user_row(*id, name);
6843 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6844 })
6845 .collect();
6846 let (seg_bytes, _) =
6847 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6848 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
6849 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6850 .iter()
6851 .map(|(id, _)| {
6852 (
6853 IndexKey::Int(*id),
6854 RowLocator::Cold {
6855 segment_id: seg_id,
6856 page_offset: 0,
6857 },
6858 )
6859 })
6860 .collect();
6861 cat.get_mut("users")
6862 .unwrap()
6863 .register_cold_locators("by_id", pairs)
6864 .unwrap();
6865
6866 let bytes = cat.serialize();
6868 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
6869 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
6870
6871 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
6878 assert_eq!(restored_seg_id, seg_id);
6879
6880 let idx = restored.get("users").unwrap().index_on(0).unwrap();
6881 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
6883 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6884 for (id, _) in &cold_rows {
6886 assert_eq!(
6887 idx.lookup_eq(&IndexKey::Int(*id)),
6888 &[RowLocator::Cold {
6889 segment_id: seg_id,
6890 page_offset: 0,
6891 }]
6892 );
6893 }
6894 assert_eq!(
6896 restored
6897 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6898 .unwrap(),
6899 make_user_row(2, "bob")
6900 );
6901 for (id, name) in &cold_rows {
6902 assert_eq!(
6903 restored
6904 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6905 .unwrap(),
6906 make_user_row(*id, name)
6907 );
6908 }
6909 }
6910
6911 #[test]
6918 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
6919 let schema = TableSchema::new(
6920 "wide",
6921 vec![
6922 ColumnSchema::new("a", DataType::SmallInt, true),
6923 ColumnSchema::new("b", DataType::Int, false),
6924 ColumnSchema::new("c", DataType::BigInt, false),
6925 ColumnSchema::new("d", DataType::Float, false),
6926 ColumnSchema::new("e", DataType::Bool, false),
6927 ColumnSchema::new("f", DataType::Text, false),
6928 ColumnSchema::new(
6929 "g",
6930 DataType::Vector {
6931 dim: 3,
6932 encoding: VecEncoding::F32,
6933 },
6934 false,
6935 ),
6936 ColumnSchema::new(
6937 "h",
6938 DataType::Numeric {
6939 precision: 18,
6940 scale: 2,
6941 },
6942 false,
6943 ),
6944 ColumnSchema::new("i", DataType::Date, false),
6945 ColumnSchema::new("j", DataType::Timestamp, false),
6946 ],
6947 );
6948 let cases: &[Row] = &[
6949 Row::new(vec![
6950 Value::SmallInt(7),
6951 Value::Int(42),
6952 Value::BigInt(1_000_000),
6953 Value::Float(1.5),
6954 Value::Bool(true),
6955 Value::Text("hello".into()),
6956 Value::Vector(vec![1.0, 2.0, 3.0]),
6957 Value::Numeric {
6958 scaled: 12345,
6959 scale: 2,
6960 },
6961 Value::Date(20_000),
6962 Value::Timestamp(1_700_000_000_000_000),
6963 ]),
6964 Row::new(vec![
6966 Value::Null,
6967 Value::Int(0),
6968 Value::BigInt(0),
6969 Value::Float(0.0),
6970 Value::Bool(false),
6971 Value::Text(String::new()),
6972 Value::Vector(vec![]),
6973 Value::Numeric {
6974 scaled: 0,
6975 scale: 2,
6976 },
6977 Value::Date(0),
6978 Value::Timestamp(0),
6979 ]),
6980 Row::new(vec![
6981 Value::SmallInt(-1),
6982 Value::Int(-1),
6983 Value::BigInt(-1),
6984 Value::Float(-0.5),
6985 Value::Bool(true),
6986 Value::Text("a much longer payload here".into()),
6987 Value::Vector(vec![0.1, 0.2, 0.3]),
6988 Value::Numeric {
6989 scaled: -999_999_999,
6990 scale: 2,
6991 },
6992 Value::Date(-1),
6993 Value::Timestamp(-1),
6994 ]),
6995 ];
6996 for row in cases {
6997 let actual = encode_row_body_dense(row, &schema).len();
6998 let fast = row_body_encoded_len(row, &schema);
6999 assert_eq!(actual, fast, "row {row:?}");
7000 }
7001 }
7002
7003 #[test]
7004 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
7005 let mut cat = Catalog::new();
7006 cat.create_table(bigint_pk_users_schema()).unwrap();
7007 let t = cat.get_mut("users").unwrap();
7008 assert_eq!(t.hot_bytes(), 0);
7009 let mut expected: u64 = 0;
7010 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7011 let row = make_user_row(id, name);
7012 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
7013 t.insert(row).unwrap();
7014 }
7015 assert_eq!(t.hot_bytes(), expected);
7016 assert_eq!(cat.hot_tier_bytes(), expected);
7017 }
7018
7019 #[test]
7020 fn hot_bytes_shrinks_on_delete() {
7021 let mut cat = Catalog::new();
7022 cat.create_table(bigint_pk_users_schema()).unwrap();
7023 let t = cat.get_mut("users").unwrap();
7024 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7025 t.insert(make_user_row(id, name)).unwrap();
7026 }
7027 let before = t.hot_bytes();
7028 let bob_row = make_user_row(2, "bob");
7030 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
7031 let removed = t.delete_rows(&[1]);
7032 assert_eq!(removed, 1);
7033 assert_eq!(t.hot_bytes(), before - bob_bytes);
7034 }
7035
7036 #[test]
7037 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
7038 let mut cat = Catalog::new();
7039 cat.create_table(bigint_pk_users_schema()).unwrap();
7040 let t = cat.get_mut("users").unwrap();
7041 t.insert(make_user_row(1, "alice")).unwrap();
7042 let after_insert = t.hot_bytes();
7043 let new_row = make_user_row(1, "alice-the-longer-name");
7046 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
7047 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
7048 t.update_row(0, new_row.values).unwrap();
7049 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
7050 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
7051 }
7052
7053 #[test]
7054 fn hot_bytes_round_trips_through_serialize_deserialize() {
7055 let mut cat = Catalog::new();
7056 cat.create_table(bigint_pk_users_schema()).unwrap();
7057 let t = cat.get_mut("users").unwrap();
7058 for i in 0..10 {
7059 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
7060 .unwrap();
7061 }
7062 let pre = cat.hot_tier_bytes();
7063 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7064 assert_eq!(restored.hot_tier_bytes(), pre);
7065 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
7066 }
7067
7068 #[test]
7075 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
7076 let mut cat = Catalog::new();
7077 cat.create_table(bigint_pk_users_schema()).unwrap();
7078 let t = cat.get_mut("users").unwrap();
7079 for id in 0..10i64 {
7080 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7081 .unwrap();
7082 }
7083 t.add_index("by_id".into(), "id").unwrap();
7084 let total_bytes_before = t.hot_bytes();
7085
7086 let report = cat
7087 .freeze_oldest_to_cold("users", "by_id", 6)
7088 .expect("freeze succeeds");
7089 assert_eq!(report.frozen_rows, 6);
7090 assert_eq!(report.segment_id, 0);
7091 assert!(report.bytes_freed > 0);
7092 assert!(!report.segment_bytes.is_empty());
7093
7094 let t = cat.get("users").unwrap();
7095 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
7096 assert_eq!(cat.cold_segment_count(), 1);
7097 assert_eq!(
7099 t.hot_bytes(),
7100 total_bytes_before - report.bytes_freed,
7101 "hot_bytes accounting matches FreezeReport"
7102 );
7103
7104 for id in 0..10i64 {
7107 let got = cat
7108 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7109 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
7110 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7111 }
7112 }
7113
7114 #[test]
7119 fn freeze_twice_preserves_prior_cold_locators() {
7120 let mut cat = Catalog::new();
7121 cat.create_table(bigint_pk_users_schema()).unwrap();
7122 let t = cat.get_mut("users").unwrap();
7123 for id in 0..12i64 {
7124 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7125 .unwrap();
7126 }
7127 t.add_index("by_id".into(), "id").unwrap();
7128
7129 cat.freeze_oldest_to_cold("users", "by_id", 4)
7130 .expect("first freeze ok");
7131 cat.freeze_oldest_to_cold("users", "by_id", 4)
7132 .expect("second freeze ok");
7133
7134 assert_eq!(cat.get("users").unwrap().row_count(), 4);
7135 assert_eq!(cat.cold_segment_count(), 2);
7136 for id in 0..12i64 {
7139 let got = cat
7140 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7141 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
7142 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7143 }
7144 }
7145
7146 #[test]
7149 fn freeze_oldest_to_cold_rejects_invalid_input() {
7150 let mut cat = Catalog::new();
7151 cat.create_table(bigint_pk_users_schema()).unwrap();
7152 let t = cat.get_mut("users").unwrap();
7153 for id in 0..3i64 {
7154 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7155 .unwrap();
7156 }
7157 t.add_index("by_id".into(), "id").unwrap();
7158
7159 assert!(matches!(
7161 cat.freeze_oldest_to_cold("users", "by_id", 0),
7162 Err(StorageError::Corrupt(_))
7163 ));
7164 assert!(matches!(
7166 cat.freeze_oldest_to_cold("missing", "by_id", 1),
7167 Err(StorageError::Corrupt(_))
7168 ));
7169 assert!(matches!(
7171 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
7172 Err(StorageError::Corrupt(_))
7173 ));
7174 assert!(matches!(
7176 cat.freeze_oldest_to_cold("users", "by_id", 999),
7177 Err(StorageError::Corrupt(_))
7178 ));
7179 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7181 assert_eq!(cat.cold_segment_count(), 0);
7182 }
7183
7184 #[test]
7187 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
7188 let mut cat = Catalog::new();
7189 cat.create_table(TableSchema::new(
7190 "by_name",
7191 vec![
7192 ColumnSchema::new("name", DataType::Text, false),
7193 ColumnSchema::new("payload", DataType::BigInt, false),
7194 ],
7195 ))
7196 .unwrap();
7197 let t = cat.get_mut("by_name").unwrap();
7198 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
7199 .unwrap();
7200 t.add_index("by_n".into(), "name").unwrap();
7201 let err = cat
7202 .freeze_oldest_to_cold("by_name", "by_n", 1)
7203 .expect_err("non-integer PK rejected");
7204 match err {
7205 StorageError::Corrupt(s) => assert!(
7206 s.contains("non-integer"),
7207 "error message names the constraint: {s}"
7208 ),
7209 other => panic!("expected Corrupt, got {other:?}"),
7210 }
7211 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7213 assert_eq!(cat.cold_segment_count(), 0);
7214 }
7215
7216 #[test]
7221 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7222 let mut cat = Catalog::new();
7223 cat.create_table(bigint_pk_users_schema()).unwrap();
7224 let t = cat.get_mut("users").unwrap();
7225 for id in 0..6i64 {
7226 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7227 .unwrap();
7228 }
7229 t.add_index("by_id".into(), "id").unwrap();
7230 t.add_index("by_name".into(), "name").unwrap();
7231
7232 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7233
7234 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7238 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7239 assert_eq!(got.len(), 1);
7240 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7241 match got[0] {
7242 RowLocator::Hot(i) => {
7243 assert_eq!(i, 1);
7246 }
7247 RowLocator::Cold { .. } => unreachable!(),
7248 }
7249 }
7250
7251 #[test]
7259 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7260 let mut cat = Catalog::new();
7261 cat.create_table(bigint_pk_users_schema()).unwrap();
7262 let t = cat.get_mut("users").unwrap();
7263 for id in 0..6i64 {
7264 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7265 .unwrap();
7266 }
7267 t.add_index("by_id".into(), "id").unwrap();
7268 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7271 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7272
7273 let new_idx = cat
7275 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7276 .expect("promote ok")
7277 .expect("PK 2 was cold");
7278 assert_eq!(
7279 new_idx, 2,
7280 "promoted row appended after the 2 surviving hot rows"
7281 );
7282
7283 let t = cat.get("users").unwrap();
7284 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7285 let row = make_user_row(2, "u-2");
7287 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7288 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7289
7290 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7293 assert_eq!(entries.len(), 1, "exactly one locator per key");
7294 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7295 assert_eq!(
7297 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7298 .unwrap(),
7299 row
7300 );
7301 assert_eq!(
7304 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7305 .unwrap(),
7306 make_user_row(0, "u-0")
7307 );
7308 }
7309
7310 #[test]
7314 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7315 let mut cat = Catalog::new();
7316 cat.create_table(bigint_pk_users_schema()).unwrap();
7317 let t = cat.get_mut("users").unwrap();
7318 t.insert(make_user_row(7, "alice")).unwrap();
7319 t.add_index("by_id".into(), "id").unwrap();
7320
7321 assert!(
7323 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7324 .unwrap()
7325 .is_none()
7326 );
7327 assert!(
7329 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7330 .unwrap()
7331 .is_none()
7332 );
7333 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7335 assert_eq!(cat.cold_segment_count(), 0);
7336 }
7337
7338 #[test]
7343 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7344 let mut cat = Catalog::new();
7345 cat.create_table(bigint_pk_users_schema()).unwrap();
7346 let t = cat.get_mut("users").unwrap();
7347 for id in 0..5i64 {
7348 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7349 .unwrap();
7350 }
7351 t.add_index("by_id".into(), "id").unwrap();
7352 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7353
7354 assert!(
7356 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7357 .is_some(),
7358 "frozen PK resolves before shadow"
7359 );
7360 let removed = cat
7361 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7362 .unwrap();
7363 assert_eq!(removed, 1, "exactly one cold locator retired");
7364
7365 assert!(
7368 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7369 .is_none(),
7370 "shadowed key no longer resolves"
7371 );
7372 assert_eq!(
7374 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7375 .unwrap(),
7376 make_user_row(0, "u-0")
7377 );
7378 assert_eq!(
7379 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7380 .unwrap(),
7381 make_user_row(2, "u-2")
7382 );
7383 }
7384
7385 #[test]
7390 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7391 let mut cat = Catalog::new();
7392 cat.create_table(bigint_pk_users_schema()).unwrap();
7393 let t = cat.get_mut("users").unwrap();
7394 t.insert(make_user_row(1, "alice")).unwrap();
7395 t.add_index("by_id".into(), "id").unwrap();
7396 assert_eq!(
7397 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7398 .unwrap(),
7399 0,
7400 "hot-only key drops no cold locators"
7401 );
7402 assert_eq!(
7403 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7404 .unwrap(),
7405 0,
7406 "absent key drops no cold locators"
7407 );
7408 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7409 }
7410
7411 #[test]
7413 fn promote_and_shadow_reject_invalid_inputs() {
7414 let mut cat = Catalog::new();
7415 cat.create_table(bigint_pk_users_schema()).unwrap();
7416 let t = cat.get_mut("users").unwrap();
7417 t.insert(make_user_row(1, "alice")).unwrap();
7418 t.add_index("by_id".into(), "id").unwrap();
7419
7420 assert!(matches!(
7422 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7423 Err(StorageError::Corrupt(_))
7424 ));
7425 assert!(matches!(
7426 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7427 Err(StorageError::Corrupt(_))
7428 ));
7429 assert!(matches!(
7431 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7432 Err(StorageError::Corrupt(_))
7433 ));
7434 assert!(matches!(
7435 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7436 Err(StorageError::Corrupt(_))
7437 ));
7438 }
7439
7440 #[test]
7447 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7448 let mut a = Catalog::new();
7449 let mut b = Catalog::new();
7450 for cat in [&mut a, &mut b] {
7451 cat.create_table(bigint_pk_users_schema()).unwrap();
7452 let t = cat.get_mut("users").unwrap();
7453 for id in 0..10i64 {
7454 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7455 .unwrap();
7456 }
7457 t.add_index("by_id".into(), "id").unwrap();
7458 }
7459 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7460 let slice = b
7461 .prepare_freeze_slice("users", "by_id", 0..6)
7462 .expect("prepare");
7463 let parallel = b
7464 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7465 .expect("commit");
7466 assert_eq!(single.segment_id, parallel.segment_id);
7467 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7468 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7469 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7470 for id in 0..10i64 {
7472 assert_eq!(
7473 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7474 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7475 "PK {id} differs after single vs slice freeze"
7476 );
7477 }
7478 }
7479
7480 #[test]
7485 fn commit_freeze_slices_two_slices_match_single_slice() {
7486 let mut a = Catalog::new();
7487 let mut b = Catalog::new();
7488 for cat in [&mut a, &mut b] {
7489 cat.create_table(bigint_pk_users_schema()).unwrap();
7490 let t = cat.get_mut("users").unwrap();
7491 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7494 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7495 .unwrap();
7496 }
7497 t.add_index("by_id".into(), "id").unwrap();
7498 }
7499 let single = a
7500 .prepare_freeze_slice("users", "by_id", 0..8)
7501 .expect("prepare");
7502 let one = a
7503 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7504 .expect("commit one");
7505 let s1 = b
7506 .prepare_freeze_slice("users", "by_id", 0..4)
7507 .expect("prepare s1");
7508 let s2 = b
7509 .prepare_freeze_slice("users", "by_id", 4..8)
7510 .expect("prepare s2");
7511 let two = b
7512 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7513 .expect("commit two");
7514 assert_eq!(one.segment_bytes, two.segment_bytes);
7515 assert_eq!(one.frozen_rows, two.frozen_rows);
7516 for id in 0..10i64 {
7519 assert_eq!(
7520 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7521 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7522 "PK {id} differs after one-slice vs two-slice freeze"
7523 );
7524 }
7525 }
7526
7527 #[test]
7529 fn commit_freeze_slices_rejects_gap() {
7530 let mut cat = Catalog::new();
7531 cat.create_table(bigint_pk_users_schema()).unwrap();
7532 let t = cat.get_mut("users").unwrap();
7533 for id in 0..6i64 {
7534 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7535 .unwrap();
7536 }
7537 t.add_index("by_id".into(), "id").unwrap();
7538 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7539 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7540 assert!(matches!(
7541 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7542 Err(StorageError::Corrupt(_))
7543 ));
7544 assert_eq!(cat.cold_segment_count(), 0);
7546 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7547 }
7548
7549 #[test]
7551 fn commit_freeze_slices_empty_is_noop() {
7552 let mut cat = Catalog::new();
7553 cat.create_table(bigint_pk_users_schema()).unwrap();
7554 let t = cat.get_mut("users").unwrap();
7555 for id in 0..3i64 {
7556 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7557 .unwrap();
7558 }
7559 t.add_index("by_id".into(), "id").unwrap();
7560 let report = cat
7561 .commit_freeze_slices("users", "by_id", Vec::new())
7562 .unwrap();
7563 assert_eq!(report.frozen_rows, 0);
7564 assert_eq!(cat.cold_segment_count(), 0);
7565 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7566 }
7567
7568 #[test]
7575 fn compact_merges_small_segments_storage_unit() {
7576 let mut cat = Catalog::new();
7577 cat.create_table(bigint_pk_users_schema()).unwrap();
7578 let t = cat.get_mut("users").unwrap();
7579 for id in 0..8i64 {
7580 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7581 .unwrap();
7582 }
7583 t.add_index("by_id".into(), "id").unwrap();
7584 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7586 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7587 assert_eq!(cat.cold_segment_count(), 2);
7588 assert_eq!(cat.cold_segment_slot_count(), 2);
7589
7590 let max_seg_bytes = cat
7593 .cold_segment_ids_global()
7594 .iter()
7595 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7596 .max()
7597 .unwrap();
7598 let target = max_seg_bytes + 1;
7599
7600 let report = cat
7601 .compact_cold_segments("users", "by_id", target)
7602 .expect("compact succeeds");
7603 assert_eq!(report.sources.len(), 2);
7604 let merged_id = report.merged_segment_id.expect("merge happened");
7605 assert_eq!(report.merged_rows, 6);
7606 assert_eq!(report.deleted_rows_pruned, 0);
7607 assert!(!report.merged_segment_bytes.is_empty());
7608
7609 assert_eq!(cat.cold_segment_count(), 1);
7612 assert_eq!(cat.cold_segment_slot_count(), 3);
7613 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7614
7615 for id in 0..8i64 {
7618 let got = cat
7619 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7620 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7621 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7622 }
7623 }
7624
7625 #[test]
7629 fn compact_drops_shadowed_cold_rows() {
7630 let mut cat = Catalog::new();
7631 cat.create_table(bigint_pk_users_schema()).unwrap();
7632 let t = cat.get_mut("users").unwrap();
7633 for id in 0..6i64 {
7634 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7635 .unwrap();
7636 }
7637 t.add_index("by_id".into(), "id").unwrap();
7638 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7639 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7640 assert_eq!(
7642 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7643 .unwrap(),
7644 1
7645 );
7646 assert_eq!(
7647 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7648 .unwrap(),
7649 1
7650 );
7651
7652 let max_seg_bytes = cat
7653 .cold_segment_ids_global()
7654 .iter()
7655 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7656 .max()
7657 .unwrap();
7658 let report = cat
7659 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7660 .expect("compact succeeds");
7661 assert_eq!(report.sources.len(), 2);
7662 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7663 assert_eq!(report.deleted_rows_pruned, 2);
7664
7665 for shadowed in [1i64, 4i64] {
7667 assert!(
7668 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7669 .is_none(),
7670 "shadowed PK {shadowed} must remain invisible after compact"
7671 );
7672 }
7673 for live in [0i64, 2, 3, 5] {
7675 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7676 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7677 }
7678 }
7679
7680 #[test]
7683 fn compact_is_noop_below_two_candidates() {
7684 let mut cat = Catalog::new();
7685 cat.create_table(bigint_pk_users_schema()).unwrap();
7686 let t = cat.get_mut("users").unwrap();
7687 for id in 0..6i64 {
7688 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7689 .unwrap();
7690 }
7691 t.add_index("by_id".into(), "id").unwrap();
7692 let report = cat
7694 .compact_cold_segments("users", "by_id", 1 << 30)
7695 .expect("noop ok");
7696 assert!(report.merged_segment_id.is_none());
7697 assert!(report.sources.is_empty());
7698
7699 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7701 let report = cat
7702 .compact_cold_segments("users", "by_id", 1 << 30)
7703 .expect("noop ok");
7704 assert!(report.merged_segment_id.is_none());
7705 assert_eq!(cat.cold_segment_count(), 1);
7706
7707 let report = cat
7710 .compact_cold_segments("users", "by_id", 1)
7711 .expect("noop ok");
7712 assert!(report.merged_segment_id.is_none());
7713 assert_eq!(cat.cold_segment_count(), 1);
7714 }
7715
7716 #[test]
7724 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7725 let mut cat = Catalog::new();
7726 cat.create_table(bigint_pk_users_schema()).unwrap();
7727 let t = cat.get_mut("users").unwrap();
7728 for id in 0..6i64 {
7729 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7730 .unwrap();
7731 }
7732 t.add_index("by_id".into(), "id").unwrap();
7733 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7734 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7735 let max_seg_bytes = cat
7736 .cold_segment_ids_global()
7737 .iter()
7738 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7739 .max()
7740 .unwrap();
7741 let report = cat
7742 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7743 .expect("compact ok");
7744 let merged_id = report.merged_segment_id.unwrap();
7745
7746 let cat_bytes = cat.serialize();
7751 let merged_bytes = report.merged_segment_bytes.clone();
7752
7753 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7754 restored
7755 .load_segment_bytes_at(merged_id, merged_bytes)
7756 .expect("reload merged ok");
7757
7758 for id in 0..6i64 {
7760 let got = restored
7761 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7762 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7763 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7764 }
7765 assert_eq!(restored.cold_segment_count(), 1);
7768 }
7769
7770 #[test]
7773 fn load_segment_bytes_at_pads_and_rejects_collision() {
7774 let mut cat = Catalog::new();
7775 cat.create_table(bigint_pk_users_schema()).unwrap();
7776 let t = cat.get_mut("users").unwrap();
7777 for id in 0..4i64 {
7778 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7779 .unwrap();
7780 }
7781 t.add_index("by_id".into(), "id").unwrap();
7782 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7783 let bytes_seg0 = report.segment_bytes.clone();
7784
7785 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7789 .expect("pad + load ok");
7790 assert_eq!(cat.cold_segment_slot_count(), 6);
7791 assert_eq!(cat.cold_segment_count(), 2);
7792
7793 assert!(matches!(
7795 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7796 Err(StorageError::Corrupt(_))
7797 ));
7798 assert!(matches!(
7800 cat.load_segment_bytes_at(0, bytes_seg0),
7801 Err(StorageError::Corrupt(_))
7802 ));
7803 }
7804
7805 #[test]
7809 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7810 let mut cat = Catalog::new();
7811 cat.create_table(bigint_pk_users_schema()).unwrap();
7812 let t = cat.get_mut("users").unwrap();
7813 for id in 0..4i64 {
7814 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7815 .unwrap();
7816 }
7817 t.add_index("by_id".into(), "id").unwrap();
7818
7819 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7821 let promoted = cat
7822 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7823 .unwrap();
7824 assert!(promoted.is_some());
7825 let entries_after_promote = cat
7826 .get("users")
7827 .unwrap()
7828 .index_on(0)
7829 .unwrap()
7830 .lookup_eq(&IndexKey::Int(0))
7831 .to_vec();
7832 assert_eq!(entries_after_promote.len(), 1);
7833 assert!(entries_after_promote[0].is_hot());
7834
7835 for id in [2i64, 3] {
7842 assert_eq!(
7843 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7844 .unwrap(),
7845 make_user_row(id, &alloc::format!("u-{id}"))
7846 );
7847 }
7848 }
7849}