1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138}
139
140impl fmt::Display for DataType {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 Self::SmallInt => f.write_str("SMALLINT"),
144 Self::Int => f.write_str("INT"),
145 Self::BigInt => f.write_str("BIGINT"),
146 Self::Float => f.write_str("FLOAT"),
147 Self::Text => f.write_str("TEXT"),
148 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
149 Self::Char(n) => write!(f, "CHAR({n})"),
150 Self::Bool => f.write_str("BOOL"),
151 Self::Vector { dim, encoding } => match encoding {
152 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
153 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
154 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
155 },
156 Self::Numeric { precision, scale } => {
157 if *scale == 0 {
158 write!(f, "NUMERIC({precision})")
159 } else {
160 write!(f, "NUMERIC({precision}, {scale})")
161 }
162 }
163 Self::Date => f.write_str("DATE"),
164 Self::Timestamp => f.write_str("TIMESTAMP"),
165 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
166 Self::Interval => f.write_str("INTERVAL"),
167 Self::Json => f.write_str("JSON"),
168 Self::Jsonb => f.write_str("JSONB"),
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq)]
177#[non_exhaustive]
178pub enum Value {
179 SmallInt(i16),
180 Int(i32),
181 BigInt(i64),
182 Float(f64),
183 Text(String),
184 Bool(bool),
185 Vector(Vec<f32>),
186 Sq8Vector(crate::quantize::Sq8Vector),
193 HalfVector(crate::halfvec::HalfVector),
199 Numeric {
203 scaled: i128,
204 scale: u8,
205 },
206 Date(i32),
208 Timestamp(i64),
210 Interval {
213 months: i32,
214 micros: i64,
215 },
216 Json(String),
220 Null,
221}
222
223impl Value {
224 pub fn data_type(&self) -> Option<DataType> {
226 match self {
227 Self::SmallInt(_) => Some(DataType::SmallInt),
228 Self::Int(_) => Some(DataType::Int),
229 Self::BigInt(_) => Some(DataType::BigInt),
230 Self::Float(_) => Some(DataType::Float),
231 Self::Text(_) => Some(DataType::Text),
234 Self::Bool(_) => Some(DataType::Bool),
235 Self::Vector(v) => Some(DataType::Vector {
236 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
237 encoding: VecEncoding::F32,
238 }),
239 Self::Sq8Vector(q) => Some(DataType::Vector {
240 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
241 encoding: VecEncoding::Sq8,
242 }),
243 Self::HalfVector(h) => Some(DataType::Vector {
244 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
245 encoding: VecEncoding::F16,
246 }),
247 Self::Numeric { scale, .. } => Some(DataType::Numeric {
252 precision: 0,
253 scale: *scale,
254 }),
255 Self::Date(_) => Some(DataType::Date),
256 Self::Timestamp(_) => Some(DataType::Timestamp),
257 Self::Interval { .. } => Some(DataType::Interval),
258 Self::Json(_) => Some(DataType::Json),
259 Self::Null => None,
260 }
261 }
262
263 pub const fn is_null(&self) -> bool {
264 matches!(self, Self::Null)
265 }
266}
267
268#[derive(Debug, Clone, PartialEq)]
271pub struct Row {
272 pub values: Vec<Value>,
273}
274
275impl Row {
276 pub const fn new(values: Vec<Value>) -> Self {
277 Self { values }
278 }
279
280 pub fn len(&self) -> usize {
281 self.values.len()
282 }
283
284 pub fn is_empty(&self) -> bool {
285 self.values.is_empty()
286 }
287}
288
289#[derive(Debug, Clone, PartialEq)]
290pub struct ColumnSchema {
291 pub name: String,
292 pub ty: DataType,
293 pub nullable: bool,
294 pub default: Option<Value>,
298 pub auto_increment: bool,
302}
303
304#[derive(Debug, Clone, PartialEq)]
305pub struct TableSchema {
306 pub name: String,
307 pub columns: Vec<ColumnSchema>,
308 pub hot_tier_bytes: Option<u64>,
314 pub foreign_keys: Vec<ForeignKeyConstraint>,
321}
322
323#[derive(Debug, Clone, PartialEq, Eq)]
328pub struct ForeignKeyConstraint {
329 pub name: Option<String>,
333 pub local_columns: Vec<usize>,
336 pub parent_table: String,
338 pub parent_columns: Vec<usize>,
343 pub on_delete: FkAction,
345 pub on_update: FkAction,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
352pub enum FkAction {
353 Restrict,
354 Cascade,
355 SetNull,
356 SetDefault,
357 NoAction,
358}
359
360impl FkAction {
361 pub const fn tag(self) -> u8 {
363 match self {
364 Self::Restrict => 0,
365 Self::Cascade => 1,
366 Self::SetNull => 2,
367 Self::SetDefault => 3,
368 Self::NoAction => 4,
369 }
370 }
371 pub const fn from_tag(b: u8) -> Option<Self> {
372 Some(match b {
373 0 => Self::Restrict,
374 1 => Self::Cascade,
375 2 => Self::SetNull,
376 3 => Self::SetDefault,
377 4 => Self::NoAction,
378 _ => return None,
379 })
380 }
381}
382
383impl TableSchema {
384 pub fn column_position(&self, name: &str) -> Option<usize> {
385 self.columns.iter().position(|c| c.name == name)
386 }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
394pub enum IndexKey {
395 Int(i64),
396 Text(String),
397 Bool(bool),
398}
399
400impl IndexKey {
401 pub fn from_value(v: &Value) -> Option<Self> {
402 match v {
403 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
404 Value::Int(n) => Some(Self::Int(i64::from(*n))),
405 Value::BigInt(n) => Some(Self::Int(*n)),
406 Value::Text(s) => Some(Self::Text(s.clone())),
407 Value::Bool(b) => Some(Self::Bool(*b)),
408 Value::Date(d) => Some(Self::Int(i64::from(*d))),
411 Value::Timestamp(t) => Some(Self::Int(*t)),
412 Value::Null
417 | Value::Float(_)
418 | Value::Vector(_)
419 | Value::Sq8Vector(_)
420 | Value::HalfVector(_)
421 | Value::Numeric { .. }
422 | Value::Interval { .. }
423 | Value::Json(_) => None,
424 }
425 }
426}
427
428#[derive(Debug, Clone)]
433pub struct Index {
434 pub name: String,
435 pub column_position: usize,
436 pub kind: IndexKind,
437 pub included_columns: Vec<usize>,
447 pub partial_predicate: Option<String>,
454 pub expression: Option<String>,
459}
460
461pub const NSW_DEFAULT_M: usize = 16;
464
465#[derive(Debug, Clone)]
473pub struct FreezeReport {
474 pub segment_id: u32,
477 pub frozen_rows: usize,
480 pub bytes_freed: u64,
484 pub segment_bytes: Vec<u8>,
489}
490
491#[derive(Debug, Clone)]
500pub struct FreezeSlice {
501 pub row_range: core::ops::Range<usize>,
506 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
512}
513
514#[derive(Debug, Clone)]
530pub struct CompactReport {
531 pub sources: Vec<u32>,
533 pub merged_segment_id: Option<u32>,
535 pub merged_segment_bytes: Vec<u8>,
537 pub merged_rows: usize,
539 pub deleted_rows_pruned: usize,
544 pub bytes_reclaimed_estimate: u64,
548}
549
550#[derive(Debug, Clone)]
551pub enum IndexKind {
552 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
569 Nsw(NswGraph),
571 Brin {
578 column_type: DataType,
582 },
583}
584
585#[derive(Debug, Clone)]
594pub struct NswGraph {
595 pub m: usize,
597 pub m_max_0: usize,
600 pub entry: Option<usize>,
603 pub entry_level: u8,
605 pub levels: PersistentVec<u8>,
612 pub layers: Vec<PersistentVec<Vec<u32>>>,
628}
629
630impl NswGraph {
631 fn new(m: usize) -> Self {
632 Self {
633 m,
634 m_max_0: m.saturating_mul(2),
635 entry: None,
636 entry_level: 0,
637 levels: PersistentVec::new(),
638 layers: alloc::vec![PersistentVec::new()],
639 }
640 }
641
642 pub const fn cap_for_layer(&self, layer: u8) -> usize {
644 if layer == 0 { self.m_max_0 } else { self.m }
645 }
646}
647
648#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
655 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
658 x ^= x >> 30;
659 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
660 x ^= x >> 27;
661 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
662 x ^= x >> 31;
663 let mut level: u8 = 0;
668 while x & 0xF == 0 && level < MAX_LEVEL {
669 level += 1;
670 x >>= 4;
671 }
672 level
673}
674
675impl Index {
676 fn new_btree(name: String, column_position: usize) -> Self {
677 Self {
678 name,
679 column_position,
680 kind: IndexKind::BTree(PersistentBTreeMap::new()),
681 included_columns: Vec::new(),
682 partial_predicate: None,
683 expression: None,
684 }
685 }
686
687 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
688 Self {
689 name,
690 column_position,
691 kind: IndexKind::Nsw(NswGraph::new(m)),
692 included_columns: Vec::new(),
693 partial_predicate: None,
694 expression: None,
695 }
696 }
697
698 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
702 Self {
703 name,
704 column_position,
705 kind: IndexKind::Brin { column_type },
706 included_columns: Vec::new(),
707 partial_predicate: None,
708 expression: None,
709 }
710 }
711
712 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
721 match &self.kind {
722 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
723 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
725 }
726 }
727
728 pub const fn nsw(&self) -> Option<&NswGraph> {
731 match &self.kind {
732 IndexKind::Nsw(g) => Some(g),
733 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
734 }
735 }
736
737 pub const fn is_brin(&self) -> bool {
742 matches!(self.kind, IndexKind::Brin { .. })
743 }
744}
745
746#[derive(Debug, Clone)]
762pub struct Table {
763 schema: TableSchema,
764 rows: PersistentVec<Row>,
765 indices: Vec<Index>,
766 hot_bytes: u64,
767 cold_row_count: u64,
781 cold_row_count_stale: bool,
786}
787
788impl Table {
789 pub fn new(schema: TableSchema) -> Self {
790 Self {
791 schema,
792 rows: PersistentVec::new(),
793 indices: Vec::new(),
794 hot_bytes: 0,
795 cold_row_count: 0,
796 cold_row_count_stale: false,
797 }
798 }
799
800 #[must_use]
804 pub const fn hot_bytes(&self) -> u64 {
805 self.hot_bytes
806 }
807
808 #[must_use]
811 pub const fn cold_row_count(&self) -> u64 {
812 self.cold_row_count
813 }
814
815 pub fn set_cold_row_count(&mut self, n: u64) {
818 self.cold_row_count = n;
819 self.cold_row_count_stale = false;
820 }
821
822 pub fn mark_cold_row_count_stale(&mut self) {
827 self.cold_row_count_stale = true;
828 }
829
830 #[must_use]
834 pub const fn cold_row_count_stale(&self) -> bool {
835 self.cold_row_count_stale
836 }
837
838 #[must_use]
849 pub fn count_cold_locators(&self) -> u64 {
850 let mut best: u64 = 0;
851 for idx in &self.indices {
852 if let IndexKind::BTree(map) = &idx.kind {
853 let n: u64 = map
854 .iter()
855 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
856 .sum();
857 if n > best {
858 best = n;
859 }
860 }
861 }
862 best
863 }
864
865 pub const fn schema(&self) -> &TableSchema {
866 &self.schema
867 }
868
869 pub const fn schema_mut(&mut self) -> &mut TableSchema {
873 &mut self.schema
874 }
875
876 pub const fn rows(&self) -> &PersistentVec<Row> {
880 &self.rows
881 }
882
883 pub const fn row_count(&self) -> usize {
884 self.rows.len()
885 }
886
887 pub fn indices_mut(&mut self) -> &mut [Index] {
892 &mut self.indices
893 }
894
895 pub fn indices(&self) -> &[Index] {
896 &self.indices
897 }
898
899 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
905 let ty = self.schema.columns.get(col_pos)?.ty;
906 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
907 return None;
908 }
909 let mut max: Option<i64> = None;
910 for row in &self.rows {
911 match row.values.get(col_pos) {
912 Some(Value::SmallInt(n)) => {
913 let v = i64::from(*n);
914 max = Some(max.map_or(v, |m| m.max(v)));
915 }
916 Some(Value::Int(n)) => {
917 let v = i64::from(*n);
918 max = Some(max.map_or(v, |m| m.max(v)));
919 }
920 Some(Value::BigInt(n)) => {
921 max = Some(max.map_or(*n, |m| m.max(*n)));
922 }
923 _ => {}
924 }
925 }
926 Some(max.map_or(1, |m| m + 1))
927 }
928
929 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
933 self.indices
940 .iter()
941 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
942 .or_else(|| {
943 self.indices
944 .iter()
945 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
946 })
947 }
948
949 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
953 if row.len() != self.schema.columns.len() {
954 return Err(StorageError::ArityMismatch {
955 expected: self.schema.columns.len(),
956 actual: row.len(),
957 });
958 }
959 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
960 if val.is_null() {
961 if !col.nullable {
962 return Err(StorageError::NullInNotNull {
963 column: col.name.clone(),
964 });
965 }
966 continue;
967 }
968 let actual = val.data_type().expect("non-null");
969 let compatible = actual == col.ty
983 || matches!(
984 (actual, col.ty),
985 (
986 DataType::Text,
987 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
988 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
989 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
990 | (DataType::Timestamp, DataType::Timestamptz)
991 | (DataType::Timestamptz, DataType::Timestamp)
992 )
993 || matches!(
994 (actual, col.ty),
995 (
996 DataType::Numeric { scale: a, .. },
997 DataType::Numeric { scale: b, .. },
998 ) if a == b
999 );
1000 if !compatible {
1001 return Err(StorageError::TypeMismatch {
1002 column: col.name.clone(),
1003 expected: col.ty,
1004 actual,
1005 position: i,
1006 });
1007 }
1008 }
1009 let new_row_idx = self.rows.len();
1010 for idx in &mut self.indices {
1014 if let IndexKind::BTree(map) = &mut idx.kind
1015 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1016 {
1017 let mut entries = map.get(&key).cloned().unwrap_or_default();
1023 entries.push(RowLocator::Hot(new_row_idx));
1024 map.insert_mut(key, entries);
1025 }
1026 }
1027 self.hot_bytes = self
1030 .hot_bytes
1031 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1032 self.rows.push_mut(row);
1037 let new_row_idx = self.rows.len() - 1;
1040 let nsw_targets: Vec<usize> = self
1041 .indices
1042 .iter()
1043 .enumerate()
1044 .filter_map(|(i, idx)| {
1045 if matches!(idx.kind, IndexKind::Nsw(_)) {
1046 Some(i)
1047 } else {
1048 None
1049 }
1050 })
1051 .collect();
1052 for idx_pos in nsw_targets {
1053 nsw_insert_at(self, idx_pos, new_row_idx);
1054 }
1055 Ok(())
1056 }
1057
1058 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1062 if self.indices.iter().any(|i| i.name == name) {
1063 return Err(StorageError::DuplicateIndex { name });
1064 }
1065 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1066 StorageError::ColumnNotFound {
1067 column: column_name.into(),
1068 }
1069 })?;
1070 let mut idx = Index::new_btree(name, column_position);
1071 if let IndexKind::BTree(map) = &mut idx.kind {
1072 for (i, row) in self.rows.iter().enumerate() {
1073 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1074 let mut entries = map.get(&key).cloned().unwrap_or_default();
1075 entries.push(RowLocator::Hot(i));
1076 map.insert_mut(key, entries);
1077 }
1078 }
1079 }
1080 self.indices.push(idx);
1081 Ok(())
1082 }
1083
1084 pub fn add_nsw_index(
1089 &mut self,
1090 name: String,
1091 column_name: &str,
1092 m: usize,
1093 ) -> Result<(), StorageError> {
1094 self.add_nsw_index_inner(name, column_name, m, None)
1095 }
1096
1097 pub fn rebuild_nsw_index(
1109 &mut self,
1110 name: &str,
1111 new_encoding: Option<VecEncoding>,
1112 ) -> Result<(), StorageError> {
1113 let idx_pos = self
1114 .indices
1115 .iter()
1116 .position(|i| i.name == name)
1117 .ok_or_else(|| StorageError::IndexNotFound {
1118 name: String::from(name),
1119 })?;
1120 let col_pos = self.indices[idx_pos].column_position;
1121 let m = match &self.indices[idx_pos].kind {
1122 IndexKind::Nsw(g) => g.m,
1123 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1124 return Err(StorageError::Unsupported(format!(
1125 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1126 )));
1127 }
1128 };
1129 let col_name = self.schema.columns[col_pos].name.clone();
1130 if let Some(target) = new_encoding {
1133 let current = match self.schema.columns[col_pos].ty {
1134 DataType::Vector { encoding, .. } => encoding,
1135 ref other => {
1136 return Err(StorageError::Unsupported(format!(
1137 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1138 )));
1139 }
1140 };
1141 if target != current {
1142 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1143 unreachable!("checked above")
1144 };
1145 let n = self.rows.len();
1146 for i in 0..n {
1147 let row = self
1148 .rows
1149 .get_mut(i)
1150 .expect("row index in bounds (we iterated up to len())");
1151 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1152 let recoded = recode_vector_cell(cell, target)?;
1153 row.values[col_pos] = recoded;
1154 }
1155 self.schema.columns[col_pos].ty = DataType::Vector {
1156 dim,
1157 encoding: target,
1158 };
1159 }
1160 }
1161 self.indices.remove(idx_pos);
1163 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1164 Ok(())
1165 }
1166
1167 pub fn restore_nsw_index(
1172 &mut self,
1173 name: String,
1174 column_name: &str,
1175 graph: NswGraph,
1176 ) -> Result<(), StorageError> {
1177 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1178 }
1179
1180 pub fn restore_btree_index(
1187 &mut self,
1188 name: String,
1189 column_name: &str,
1190 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1191 ) -> Result<(), StorageError> {
1192 if self.indices.iter().any(|i| i.name == name) {
1193 return Err(StorageError::DuplicateIndex { name });
1194 }
1195 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1196 StorageError::ColumnNotFound {
1197 column: column_name.into(),
1198 }
1199 })?;
1200 self.indices.push(Index {
1201 name,
1202 column_position,
1203 kind: IndexKind::BTree(map),
1204 included_columns: Vec::new(),
1205 partial_predicate: None,
1206 expression: None,
1207 });
1208 Ok(())
1209 }
1210
1211 pub fn restore_brin_index(
1216 &mut self,
1217 name: String,
1218 column_name: &str,
1219 column_type: DataType,
1220 ) -> Result<(), StorageError> {
1221 if self.indices.iter().any(|i| i.name == name) {
1222 return Err(StorageError::DuplicateIndex { name });
1223 }
1224 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1225 StorageError::ColumnNotFound {
1226 column: column_name.into(),
1227 }
1228 })?;
1229 self.indices.push(Index::new_brin(name, column_position, column_type));
1230 Ok(())
1231 }
1232
1233 pub fn add_brin_index(
1237 &mut self,
1238 name: String,
1239 column_name: &str,
1240 ) -> Result<(), StorageError> {
1241 if self.indices.iter().any(|i| i.name == name) {
1242 return Err(StorageError::DuplicateIndex { name });
1243 }
1244 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1245 StorageError::ColumnNotFound {
1246 column: column_name.into(),
1247 }
1248 })?;
1249 let column_type = self.schema.columns[column_position].ty;
1250 self.indices.push(Index::new_brin(name, column_position, column_type));
1251 Ok(())
1252 }
1253
1254 pub fn register_cold_locators<I>(
1271 &mut self,
1272 index_name: &str,
1273 locators: I,
1274 ) -> Result<usize, StorageError>
1275 where
1276 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1277 {
1278 let idx = self
1279 .indices
1280 .iter_mut()
1281 .find(|i| i.name == index_name)
1282 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1283 let map = match &mut idx.kind {
1284 IndexKind::BTree(map) => map,
1285 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1286 return Err(StorageError::Corrupt(format!(
1287 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1288 )));
1289 }
1290 };
1291 let mut count = 0usize;
1292 for (key, locator) in locators {
1293 let mut entries = map.get(&key).cloned().unwrap_or_default();
1294 entries.push(locator);
1295 map.insert_mut(key, entries);
1296 count += 1;
1297 }
1298 Ok(count)
1299 }
1300
1301 pub fn remove_cold_locators_for_key(
1311 &mut self,
1312 index_name: &str,
1313 key: &IndexKey,
1314 ) -> Result<usize, StorageError> {
1315 let idx = self
1316 .indices
1317 .iter_mut()
1318 .find(|i| i.name == index_name)
1319 .ok_or_else(|| {
1320 StorageError::Corrupt(format!(
1321 "remove_cold_locators_for_key: index {index_name:?} not found"
1322 ))
1323 })?;
1324 let map = match &mut idx.kind {
1325 IndexKind::BTree(map) => map,
1326 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1327 return Err(StorageError::Corrupt(format!(
1328 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1329 cold locators apply only to BTree indices"
1330 )));
1331 }
1332 };
1333 let Some(entries) = map.get(key) else {
1334 return Ok(0);
1335 };
1336 let mut kept: Vec<RowLocator> =
1337 entries.iter().copied().filter(RowLocator::is_hot).collect();
1338 let removed = entries.len() - kept.len();
1339 if removed == 0 {
1340 return Ok(0);
1341 }
1342 kept.shrink_to_fit();
1343 map.insert_mut(key.clone(), kept);
1351 Ok(removed)
1352 }
1353
1354 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1360 if positions.is_empty() {
1361 return 0;
1362 }
1363 let mut to_remove = alloc::vec![false; self.rows.len()];
1367 let mut removed = 0;
1368 for &p in positions {
1369 if p < to_remove.len() && !to_remove[p] {
1370 to_remove[p] = true;
1371 removed += 1;
1372 }
1373 }
1374 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1375 let mut removed_bytes: u64 = 0;
1376 for (i, row) in self.rows.iter().enumerate() {
1377 if to_remove[i] {
1378 removed_bytes =
1379 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1380 } else {
1381 new_rows.push_mut(row.clone());
1382 }
1383 }
1384 self.rows = new_rows;
1385 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1386 self.rebuild_indices();
1387 removed
1388 }
1389
1390 pub fn update_row(
1396 &mut self,
1397 position: usize,
1398 new_values: Vec<Value>,
1399 ) -> Result<(), StorageError> {
1400 if position >= self.rows.len() {
1401 return Err(StorageError::Corrupt(alloc::format!(
1402 "update_row: position {position} out of bounds (rows={})",
1403 self.rows.len()
1404 )));
1405 }
1406 if new_values.len() != self.schema.columns.len() {
1407 return Err(StorageError::ArityMismatch {
1408 expected: self.schema.columns.len(),
1409 actual: new_values.len(),
1410 });
1411 }
1412 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1416 if val.is_null() {
1417 if !col.nullable {
1418 return Err(StorageError::NullInNotNull {
1419 column: col.name.clone(),
1420 });
1421 }
1422 continue;
1423 }
1424 let actual = val.data_type().expect("non-null");
1425 let compatible = actual == col.ty
1426 || matches!(
1427 (actual, col.ty),
1428 (
1429 DataType::Text,
1430 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1431 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1432 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1433 | (DataType::Timestamp, DataType::Timestamptz)
1434 | (DataType::Timestamptz, DataType::Timestamp)
1435 )
1436 || matches!(
1437 (actual, col.ty),
1438 (
1439 DataType::Numeric { scale: a, .. },
1440 DataType::Numeric { scale: b, .. },
1441 ) if a == b
1442 );
1443 if !compatible {
1444 return Err(StorageError::TypeMismatch {
1445 column: col.name.clone(),
1446 expected: col.ty,
1447 actual,
1448 position: i,
1449 });
1450 }
1451 }
1452 let old_row = self
1453 .rows
1454 .get(position)
1455 .expect("position bounds-checked above");
1456 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1457 let new_row = Row::new(new_values);
1458 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1459 self.rows = self
1460 .rows
1461 .set(position, new_row)
1462 .expect("position bounds-checked above");
1463 self.hot_bytes = self
1464 .hot_bytes
1465 .saturating_sub(old_bytes)
1466 .saturating_add(new_bytes);
1467 self.rebuild_indices();
1468 Ok(())
1469 }
1470
1471 fn rebuild_indices(&mut self) {
1478 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1487 .indices
1488 .iter()
1489 .filter_map(|idx| match &idx.kind {
1490 IndexKind::BTree(map) => {
1491 let cold: Vec<(IndexKey, RowLocator)> = map
1492 .iter()
1493 .flat_map(|(k, locs)| {
1494 locs.iter()
1495 .filter(|l| l.is_cold())
1496 .copied()
1497 .map(move |l| (k.clone(), l))
1498 })
1499 .collect();
1500 if cold.is_empty() {
1501 None
1502 } else {
1503 Some((idx.name.clone(), cold))
1504 }
1505 }
1506 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1508 })
1509 .collect();
1510
1511 #[derive(Clone)]
1516 enum RebuildKind {
1517 BTree,
1518 Nsw(usize),
1519 Brin(DataType),
1520 }
1521 let descriptors: Vec<(String, usize, RebuildKind)> = self
1522 .indices
1523 .iter()
1524 .map(|idx| {
1525 let kind = match &idx.kind {
1526 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1527 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1528 IndexKind::BTree(_) => RebuildKind::BTree,
1529 };
1530 (idx.name.clone(), idx.column_position, kind)
1531 })
1532 .collect();
1533 self.indices.clear();
1534 for (name, column_position, rebuild_kind) in descriptors {
1535 match rebuild_kind {
1536 RebuildKind::Nsw(m) => {
1537 let idx = Index::new_nsw(name, column_position, m);
1538 self.indices.push(idx);
1539 let idx_pos = self.indices.len() - 1;
1540 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1541 for row_idx in row_indices {
1542 nsw_insert_at(self, idx_pos, row_idx);
1543 }
1544 }
1545 RebuildKind::Brin(column_type) => {
1546 self.indices.push(Index::new_brin(name, column_position, column_type));
1549 }
1550 RebuildKind::BTree => {
1551 let mut idx = Index::new_btree(name, column_position);
1552 if let IndexKind::BTree(map) = &mut idx.kind {
1553 for (i, row) in self.rows.iter().enumerate() {
1554 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1555 let mut entries = map.get(&key).cloned().unwrap_or_default();
1556 entries.push(RowLocator::Hot(i));
1557 map.insert_mut(key, entries);
1558 }
1559 }
1560 }
1561 self.indices.push(idx);
1562 }
1563 }
1564 }
1565
1566 for (idx_name, locators) in preserved_cold {
1571 let _ = self.register_cold_locators(&idx_name, locators);
1575 }
1576 }
1577
1578 fn add_nsw_index_inner(
1579 &mut self,
1580 name: String,
1581 column_name: &str,
1582 m: usize,
1583 restore: Option<NswGraph>,
1584 ) -> Result<(), StorageError> {
1585 if self.indices.iter().any(|i| i.name == name) {
1586 return Err(StorageError::DuplicateIndex { name });
1587 }
1588 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1589 StorageError::ColumnNotFound {
1590 column: column_name.into(),
1591 }
1592 })?;
1593 if !matches!(
1594 self.schema.columns[column_position].ty,
1595 DataType::Vector { .. }
1596 ) {
1597 return Err(StorageError::TypeMismatch {
1598 column: column_name.into(),
1599 expected: DataType::Vector {
1600 dim: 0,
1601 encoding: VecEncoding::F32,
1602 },
1603 actual: self.schema.columns[column_position].ty,
1604 position: column_position,
1605 });
1606 }
1607 if let Some(graph) = restore {
1608 self.indices.push(Index {
1609 name,
1610 column_position,
1611 kind: IndexKind::Nsw(graph),
1612 included_columns: Vec::new(),
1613 partial_predicate: None,
1614 expression: None,
1615 });
1616 return Ok(());
1617 }
1618 let idx = Index::new_nsw(name, column_position, m);
1619 self.indices.push(idx);
1620 let idx_pos = self.indices.len() - 1;
1621 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1624 for row_idx in row_indices {
1625 nsw_insert_at(self, idx_pos, row_idx);
1626 }
1627 Ok(())
1628 }
1629}
1630
1631fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1638 if matches!(cell, Value::Null) {
1639 return Ok(cell);
1640 }
1641 let as_f32: Vec<f32> = match &cell {
1643 Value::Vector(v) => v.clone(),
1644 Value::Sq8Vector(q) => quantize::dequantize(q),
1645 Value::HalfVector(h) => h.to_f32_vec(),
1646 other => {
1647 return Err(StorageError::Unsupported(format!(
1648 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1649 other.data_type()
1650 )));
1651 }
1652 };
1653 Ok(match target {
1658 VecEncoding::F32 => Value::Vector(as_f32),
1659 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1660 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1661 })
1662}
1663
1664fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1671 let col_pos = table.indices[idx_pos].column_position;
1672 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1673 Value::Vector(v) => Some(v.len()),
1674 Value::Sq8Vector(q) => Some(q.bytes.len()),
1675 Value::HalfVector(h) => Some(h.dim()),
1676 _ => None,
1677 };
1678 let Some(dim) = cell_dim else {
1679 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1682 return;
1683 };
1684 if dim == 0 {
1685 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1686 return;
1687 }
1688 let level = nsw_assign_level(new_row_idx);
1689 ensure_node_slot(table, idx_pos, new_row_idx, level);
1690 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1691 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1692 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1693 unreachable!("nsw_insert_at on a non-NSW index")
1694 }
1695 };
1696 if entry.is_none() {
1698 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1699 g.entry = Some(new_row_idx);
1700 g.entry_level = level;
1701 *g.levels
1702 .get_mut(new_row_idx)
1703 .expect("levels slot padded by ensure_node_slot") = level;
1704 }
1705 return;
1706 }
1707 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1709 *g.levels
1710 .get_mut(new_row_idx)
1711 .expect("levels slot padded by ensure_node_slot") = level;
1712 }
1713 let query = match &table.rows[new_row_idx].values[col_pos] {
1714 Value::Vector(v) => v.clone(),
1715 Value::Sq8Vector(q) => quantize::dequantize(q),
1721 Value::HalfVector(h) => h.to_f32_vec(),
1724 _ => return,
1725 };
1726 let mut current = entry.expect("entry was Some above");
1729 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1730 if entry_level > level {
1731 for layer in (level + 1..=entry_level).rev() {
1732 (current, current_d) =
1733 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1734 }
1735 }
1736 let top = level.min(entry_level);
1740 let ef = (m * 2).max(8);
1741 for layer in (0..=top).rev() {
1742 let cap = if layer == 0 { m * 2 } else { m };
1743 let mut candidates = layer_beam_search(
1744 table,
1745 idx_pos,
1746 layer,
1747 current,
1748 current_d,
1749 &query,
1750 ef,
1751 NswMetric::L2,
1752 );
1753 candidates.retain(|&(_, n)| n != new_row_idx);
1754 if let Some(&(d, n)) = candidates.first() {
1757 current = n;
1758 current_d = d;
1759 }
1760 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1761 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1762 }
1763 if level > entry_level
1766 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1767 {
1768 g.entry = Some(new_row_idx);
1769 g.entry_level = level;
1770 }
1771}
1772
1773fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1777 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1778 unreachable!("ensure_node_slot on a BTree index");
1779 };
1780 while g.layers.len() <= level as usize {
1781 g.layers.push(PersistentVec::new());
1782 }
1783 while g.levels.len() <= new_row_idx {
1784 g.levels.push_mut(0);
1785 }
1786 for layer_vec in &mut g.layers {
1787 while layer_vec.len() <= new_row_idx {
1788 layer_vec.push_mut(Vec::new());
1789 }
1790 }
1791}
1792
1793fn greedy_layer_walk(
1799 table: &Table,
1800 idx_pos: usize,
1801 layer: u8,
1802 mut current: usize,
1803 mut current_d: f32,
1804 query: &[f32],
1805) -> (usize, f32) {
1806 let g = match &table.indices[idx_pos].kind {
1807 IndexKind::Nsw(g) => g,
1808 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1809 };
1810 let col_pos = table.indices[idx_pos].column_position;
1811 loop {
1812 let neighbours: &[u32] = g
1813 .layers
1814 .get(layer as usize)
1815 .and_then(|layer_v| layer_v.get(current))
1816 .map_or(&[][..], Vec::as_slice);
1817 let mut best = current;
1818 let mut best_d = current_d;
1819 for &n in neighbours {
1820 let n = n as usize;
1821 let d = vec_l2_sq(table, col_pos, n, query);
1822 if d < best_d {
1823 best = n;
1824 best_d = d;
1825 }
1826 }
1827 if best == current {
1828 return (current, current_d);
1829 }
1830 current = best;
1831 current_d = best_d;
1832 }
1833}
1834
1835#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1848 table: &Table,
1849 idx_pos: usize,
1850 layer: u8,
1851 entry_node: usize,
1852 entry_d: f32,
1853 query: &[f32],
1854 ef: usize,
1855 metric: NswMetric,
1856) -> Vec<(f32, usize)> {
1857 let g = match &table.indices[idx_pos].kind {
1858 IndexKind::Nsw(g) => g,
1859 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1860 };
1861 let col_pos = table.indices[idx_pos].column_position;
1862 let d0 = if matches!(metric, NswMetric::L2) {
1863 entry_d
1864 } else {
1865 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1866 };
1867 let row_count = table.rows.len();
1868 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1869 if entry_node < row_count {
1870 visited[entry_node] = true;
1871 }
1872 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1875 alloc::collections::BinaryHeap::with_capacity(ef);
1876 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1877 alloc::collections::BinaryHeap::with_capacity(ef);
1878 candidates.push(NodeClosest {
1879 dist: d0,
1880 node: entry_node,
1881 });
1882 results.push(NodeFurthest {
1883 dist: d0,
1884 node: entry_node,
1885 });
1886 while let Some(cur) = candidates.pop() {
1887 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1888 if cur.dist > worst && results.len() >= ef {
1889 break;
1890 }
1891 let neighbours: &[u32] = g
1892 .layers
1893 .get(layer as usize)
1894 .and_then(|layer_v| layer_v.get(cur.node))
1895 .map_or(&[][..], Vec::as_slice);
1896 for &n in neighbours {
1897 let n = n as usize;
1898 if n >= row_count || visited[n] {
1899 continue;
1900 }
1901 visited[n] = true;
1902 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
1906 if !dn.is_finite() {
1907 continue;
1908 }
1909 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1910 if results.len() < ef || dn < worst {
1911 results.push(NodeFurthest { dist: dn, node: n });
1912 if results.len() > ef {
1913 results.pop();
1914 }
1915 candidates.push(NodeClosest { dist: dn, node: n });
1916 }
1917 }
1918 }
1919 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
1922 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
1923 out
1924}
1925
1926#[derive(Debug, Clone, Copy)]
1930struct NodeClosest {
1931 dist: f32,
1932 node: usize,
1933}
1934impl PartialEq for NodeClosest {
1935 fn eq(&self, other: &Self) -> bool {
1936 self.dist == other.dist && self.node == other.node
1937 }
1938}
1939impl Eq for NodeClosest {}
1940impl PartialOrd for NodeClosest {
1941 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
1942 Some(self.cmp(other))
1943 }
1944}
1945impl Ord for NodeClosest {
1946 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
1947 other
1949 .dist
1950 .partial_cmp(&self.dist)
1951 .unwrap_or(core::cmp::Ordering::Equal)
1952 }
1953}
1954
1955#[derive(Debug, Clone, Copy)]
1958struct NodeFurthest {
1959 dist: f32,
1960 node: usize,
1961}
1962impl PartialEq for NodeFurthest {
1963 fn eq(&self, other: &Self) -> bool {
1964 self.dist == other.dist && self.node == other.node
1965 }
1966}
1967impl Eq for NodeFurthest {}
1968impl PartialOrd for NodeFurthest {
1969 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
1970 Some(self.cmp(other))
1971 }
1972}
1973impl Ord for NodeFurthest {
1974 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
1975 self.dist
1976 .partial_cmp(&other.dist)
1977 .unwrap_or(core::cmp::Ordering::Equal)
1978 }
1979}
1980
1981fn select_neighbours_heuristic(
1990 candidates: &[(f32, usize)],
1991 m: usize,
1992 table: &Table,
1993 col_pos: usize,
1994) -> Vec<usize> {
1995 let mut chosen: Vec<usize> = Vec::with_capacity(m);
1996 for &(d_q, e) in candidates {
1997 if chosen.len() >= m {
1998 break;
1999 }
2000 if !matches!(
2005 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2006 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2007 ) {
2008 continue;
2009 }
2010 let mut covered = false;
2011 for &r in &chosen {
2012 if cell_l2_sq(table, col_pos, e, r) < d_q {
2016 covered = true;
2017 break;
2018 }
2019 }
2020 if !covered {
2021 chosen.push(e);
2022 }
2023 }
2024 chosen
2025}
2026
2027fn connect_at_layer(
2031 table: &mut Table,
2032 idx_pos: usize,
2033 layer: u8,
2034 new_row_idx: usize,
2035 peers: &[usize],
2036) {
2037 let col_pos = table.indices[idx_pos].column_position;
2038 let cap = match &table.indices[idx_pos].kind {
2039 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2040 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2041 };
2042 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2047 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2048 let layer_v = &mut g.layers[layer as usize];
2049 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2050 *slot = peers
2051 .iter()
2052 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2053 .collect();
2054 }
2055 }
2056 for &peer in peers {
2057 if !matches!(
2061 &table.rows[peer].values[col_pos],
2062 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2063 ) {
2064 continue;
2065 }
2066 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2068 let layer_v = &mut g.layers[layer as usize];
2069 if let Some(slot) = layer_v.get_mut(peer)
2070 && !slot.contains(&new_row_u32)
2071 {
2072 slot.push(new_row_u32);
2073 }
2074 }
2075 let needs_trim = match &table.indices[idx_pos].kind {
2079 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2080 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2081 };
2082 if needs_trim {
2083 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2084 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2085 .iter()
2086 .map(|&n| n as usize)
2087 .collect(),
2088 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2089 };
2090 let mut tagged: Vec<(f32, usize)> = current_peers
2095 .iter()
2096 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2097 .collect();
2098 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2099 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2100 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2101 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2102 {
2103 *slot = kept
2104 .into_iter()
2105 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2106 .collect();
2107 }
2108 }
2109 }
2110}
2111
2112fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2119 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2120 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2121 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2122 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2123 }
2124 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2128 halfvec::half_l2_distance_sq_asymmetric(h, query)
2129 }
2130 _ => f32::INFINITY,
2131 }
2132}
2133
2134fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2141 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2142 return f32::INFINITY;
2143 };
2144 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2145 return f32::INFINITY;
2146 };
2147 match (cell_a, cell_b) {
2148 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2149 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2150 quantize::sq8_l2_distance_sq(a, b)
2151 }
2152 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2157 halfvec::half_l2_distance_sq(a, b)
2158 }
2159 _ => f32::INFINITY,
2160 }
2161}
2162
2163fn cell_to_query_metric_distance(
2168 table: &Table,
2169 col_pos: usize,
2170 row: usize,
2171 query: &[f32],
2172 metric: NswMetric,
2173) -> f32 {
2174 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2175 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2176 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2177 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2178 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2179 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2180 },
2181 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2184 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2185 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2186 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2187 },
2188 _ => f32::INFINITY,
2189 }
2190}
2191
2192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2198pub enum NswMetric {
2199 L2,
2202 InnerProduct,
2205 Cosine,
2208}
2209
2210fn nsw_search(
2216 table: &Table,
2217 idx_pos: usize,
2218 query: &[f32],
2219 k: usize,
2220 ef: usize,
2221 metric: NswMetric,
2222) -> Vec<(f32, usize)> {
2223 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2224 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2225 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2226 };
2227 let Some(entry) = entry else {
2228 return Vec::new();
2229 };
2230 let col_pos = table.indices[idx_pos].column_position;
2231 let sq8 = matches!(
2238 table.schema.columns.get(col_pos).map(|c| c.ty),
2239 Some(DataType::Vector {
2240 encoding: VecEncoding::Sq8,
2241 ..
2242 })
2243 );
2244 let ef = if sq8 {
2245 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2246 } else {
2247 ef.max(k)
2248 };
2249 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2251 let mut current = entry;
2252 let mut current_d = entry_d;
2253 for layer in (1..=entry_level).rev() {
2254 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2255 }
2256 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2258 if sq8 {
2259 results = sq8_rerank(table, col_pos, &results, query, metric);
2260 }
2261 results.truncate(k);
2262 results
2263}
2264
2265fn sq8_rerank(
2272 table: &Table,
2273 col_pos: usize,
2274 candidates: &[(f32, usize)],
2275 query: &[f32],
2276 metric: NswMetric,
2277) -> Vec<(f32, usize)> {
2278 let mut out: Vec<(f32, usize)> = candidates
2279 .iter()
2280 .filter_map(|&(adc_d, row)| {
2281 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2282 let Value::Sq8Vector(q) = cell else {
2283 return Some((adc_d, row));
2287 };
2288 let deq = quantize::dequantize(q);
2289 if deq.len() != query.len() {
2290 return None;
2291 }
2292 Some((metric_distance(metric, &deq, query), row))
2293 })
2294 .collect();
2295 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2296 out
2297}
2298
2299const SQ8_RERANK_OVER_FETCH: usize = 3;
2303
2304fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2305 match metric {
2306 NswMetric::L2 => l2_distance_sq(a, b),
2307 NswMetric::InnerProduct => -inner_product_f32(a, b),
2308 NswMetric::Cosine => {
2309 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2310 if na == 0.0 || nb == 0.0 {
2311 return f32::INFINITY;
2312 }
2313 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2316 1.0 - dot / denom
2317 }
2318 }
2319}
2320
2321#[doc(hidden)]
2330#[inline]
2331pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2332 #[cfg(target_arch = "aarch64")]
2333 {
2334 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2335 return unsafe { inner_product_neon(a, b) };
2338 }
2339 }
2340 inner_product_scalar(a, b)
2341}
2342
2343fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2344 let mut dot: f32 = 0.0;
2345 for (x, y) in a.iter().zip(b.iter()) {
2346 dot += x * y;
2347 }
2348 dot
2349}
2350
2351#[cfg(target_arch = "aarch64")]
2352#[target_feature(enable = "neon")]
2353#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2355 use core::arch::aarch64::{
2356 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2357 };
2358 unsafe {
2359 let zero: float32x4_t = vdupq_n_f32(0.0);
2362 let mut acc0 = zero;
2363 let mut acc1 = zero;
2364 let n = a.len();
2365 let mut i = 0usize;
2366 while i + 8 <= n {
2367 let av0 = vld1q_f32(a.as_ptr().add(i));
2368 let bv0 = vld1q_f32(b.as_ptr().add(i));
2369 acc0 = vfmaq_f32(acc0, av0, bv0);
2370 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2371 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2372 acc1 = vfmaq_f32(acc1, av1, bv1);
2373 i += 8;
2374 }
2375 while i + 4 <= n {
2376 let av = vld1q_f32(a.as_ptr().add(i));
2377 let bv = vld1q_f32(b.as_ptr().add(i));
2378 acc0 = vfmaq_f32(acc0, av, bv);
2379 i += 4;
2380 }
2381 vaddvq_f32(vaddq_f32(acc0, acc1))
2382 }
2383}
2384
2385#[doc(hidden)]
2392#[inline]
2393pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2394 #[cfg(target_arch = "aarch64")]
2395 {
2396 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2397 return unsafe { cosine_dot_norms_neon(a, b) };
2399 }
2400 }
2401 cosine_dot_norms_scalar(a, b)
2402}
2403
2404fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2405 let mut dot: f32 = 0.0;
2406 let mut na: f32 = 0.0;
2407 let mut nb: f32 = 0.0;
2408 for (x, y) in a.iter().zip(b.iter()) {
2409 dot += x * y;
2410 na += x * x;
2411 nb += y * y;
2412 }
2413 (dot, na, nb)
2414}
2415
2416#[cfg(target_arch = "aarch64")]
2417#[target_feature(enable = "neon")]
2418#[allow(clippy::many_single_char_names, clippy::similar_names)]
2419unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2420 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2421 unsafe {
2422 let zero: float32x4_t = vdupq_n_f32(0.0);
2423 let mut acc_dot = zero;
2424 let mut acc_na = zero;
2425 let mut acc_nb = zero;
2426 let n = a.len();
2427 let mut i = 0usize;
2428 while i + 4 <= n {
2429 let av = vld1q_f32(a.as_ptr().add(i));
2430 let bv = vld1q_f32(b.as_ptr().add(i));
2431 acc_dot = vfmaq_f32(acc_dot, av, bv);
2432 acc_na = vfmaq_f32(acc_na, av, av);
2433 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2434 i += 4;
2435 }
2436 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2437 }
2438}
2439
2440fn sqrt_newton_f32(x: f32) -> f32 {
2441 if x <= 0.0 {
2442 return 0.0;
2443 }
2444 let mut g = x;
2445 for _ in 0..10 {
2446 g = 0.5 * (g + x / g);
2447 }
2448 g
2449}
2450
2451#[inline]
2459fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2460 #[cfg(target_arch = "aarch64")]
2461 {
2462 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2463 return unsafe { l2_distance_sq_neon(a, b) };
2467 }
2468 }
2469 l2_distance_sq_scalar(a, b)
2470}
2471
2472fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2473 let mut sum: f32 = 0.0;
2474 for (x, y) in a.iter().zip(b.iter()) {
2475 let d = *x - *y;
2476 sum += d * d;
2477 }
2478 sum
2479}
2480
2481#[cfg(target_arch = "aarch64")]
2482#[target_feature(enable = "neon")]
2483#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2485 use core::arch::aarch64::{
2486 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2487 };
2488 unsafe {
2489 let zero: float32x4_t = vdupq_n_f32(0.0);
2494 let mut acc0 = zero;
2495 let mut acc1 = zero;
2496 let n = a.len();
2497 let mut i = 0usize;
2498 while i + 8 <= n {
2501 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2502 acc0 = vfmaq_f32(acc0, d0, d0);
2503 let d1 = vsubq_f32(
2504 vld1q_f32(a.as_ptr().add(i + 4)),
2505 vld1q_f32(b.as_ptr().add(i + 4)),
2506 );
2507 acc1 = vfmaq_f32(acc1, d1, d1);
2508 i += 8;
2509 }
2510 while i + 4 <= n {
2511 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2512 acc0 = vfmaq_f32(acc0, d, d);
2513 i += 4;
2514 }
2515 vaddvq_f32(vaddq_f32(acc0, acc1))
2516 }
2517}
2518
2519pub fn nsw_query(
2522 table: &Table,
2523 idx_name: &str,
2524 query: &[f32],
2525 k: usize,
2526 metric: NswMetric,
2527) -> Vec<usize> {
2528 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2529 return Vec::new();
2530 };
2531 let ef = (k * 2).max(NSW_DEFAULT_M);
2532 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2533 hits.truncate(k);
2534 hits.into_iter().map(|(_, idx)| idx).collect()
2535}
2536
2537pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2541 table
2542 .indices
2543 .iter()
2544 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2545}
2546
2547#[derive(Debug, Clone, Default)]
2559pub struct Catalog {
2560 tables: Vec<Table>,
2561 by_name: BTreeMap<String, usize>,
2564 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2586}
2587
2588impl Catalog {
2589 pub const fn new() -> Self {
2590 Self {
2591 tables: Vec::new(),
2592 by_name: BTreeMap::new(),
2593 cold_segments: Vec::new(),
2594 }
2595 }
2596
2597 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2598 if self.by_name.contains_key(&schema.name) {
2599 return Err(StorageError::DuplicateTable {
2600 name: schema.name.clone(),
2601 });
2602 }
2603 let idx = self.tables.len();
2604 let name = schema.name.clone();
2605 self.tables.push(Table::new(schema));
2606 self.by_name.insert(name, idx);
2607 Ok(())
2608 }
2609
2610 pub fn get(&self, name: &str) -> Option<&Table> {
2611 let idx = *self.by_name.get(name)?;
2612 self.tables.get(idx)
2613 }
2614
2615 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2616 let idx = *self.by_name.get(name)?;
2617 self.tables.get_mut(idx)
2618 }
2619
2620 pub fn table_count(&self) -> usize {
2621 self.tables.len()
2622 }
2623
2624 pub fn table_names(&self) -> Vec<String> {
2627 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2628 }
2629
2630 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2641 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2642 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2643 })?;
2644 let seg = OwnedSegment::from_bytes(bytes)
2645 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2646 self.cold_segments.push(Some(Arc::new(seg)));
2647 Ok(id)
2648 }
2649
2650 pub fn load_segment_bytes_at(
2663 &mut self,
2664 target_id: u32,
2665 bytes: Vec<u8>,
2666 ) -> Result<(), StorageError> {
2667 let seg = OwnedSegment::from_bytes(bytes)
2668 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2669 let idx = target_id as usize;
2670 while self.cold_segments.len() <= idx {
2671 self.cold_segments.push(None);
2672 }
2673 if self.cold_segments[idx].is_some() {
2674 return Err(StorageError::Corrupt(format!(
2675 "load_segment_bytes_at: segment_id {target_id} already occupied"
2676 )));
2677 }
2678 self.cold_segments[idx] = Some(Arc::new(seg));
2679 Ok(())
2680 }
2681
2682 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2692 let idx = segment_id as usize;
2693 if idx >= self.cold_segments.len() {
2694 return Err(StorageError::Corrupt(format!(
2695 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2696 self.cold_segments.len()
2697 )));
2698 }
2699 self.cold_segments[idx] = None;
2700 Ok(())
2701 }
2702
2703 #[must_use]
2705 pub fn cold_segment_count(&self) -> usize {
2706 self.cold_segments.iter().filter(|s| s.is_some()).count()
2707 }
2708
2709 #[must_use]
2712 pub fn cold_segment_slot_count(&self) -> usize {
2713 self.cold_segments.len()
2714 }
2715
2716 #[must_use]
2721 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2722 self.cold_segments
2723 .iter()
2724 .enumerate()
2725 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2726 .collect()
2727 }
2728
2729 #[must_use]
2736 pub fn hot_tier_bytes(&self) -> u64 {
2737 self.tables
2738 .iter()
2739 .map(Table::hot_bytes)
2740 .fold(0u64, u64::saturating_add)
2741 }
2742
2743 pub fn freeze_oldest_to_cold(
2788 &mut self,
2789 table_name: &str,
2790 index_name: &str,
2791 max_rows: usize,
2792 ) -> Result<FreezeReport, StorageError> {
2793 if max_rows == 0 {
2795 return Err(StorageError::Corrupt(
2796 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2797 ));
2798 }
2799 let table = self.get(table_name).ok_or_else(|| {
2800 StorageError::Corrupt(format!(
2801 "freeze_oldest_to_cold: table {table_name:?} not found"
2802 ))
2803 })?;
2804 if max_rows > table.rows.len() {
2805 return Err(StorageError::Corrupt(format!(
2806 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2807 table.rows.len()
2808 )));
2809 }
2810 let idx = table
2811 .indices
2812 .iter()
2813 .find(|i| i.name == index_name)
2814 .ok_or_else(|| {
2815 StorageError::Corrupt(format!(
2816 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2817 ))
2818 })?;
2819 if !matches!(idx.kind, IndexKind::BTree(_)) {
2820 return Err(StorageError::Corrupt(format!(
2821 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2822 )));
2823 }
2824 let column_position = idx.column_position;
2825
2826 let schema = table.schema.clone();
2828 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2829 for row_idx in 0..max_rows {
2830 let row = table.rows.get(row_idx).expect("bounds-checked above");
2831 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2832 StorageError::Corrupt(format!(
2833 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2834 ))
2835 })?;
2836 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2837 StorageError::Corrupt(format!(
2838 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2839 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2840 ))
2841 })?;
2842 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2843 }
2844 to_freeze.sort_by_key(|(k, _, _)| *k);
2849 for w in to_freeze.windows(2) {
2853 if w[0].0 == w[1].0 {
2854 return Err(StorageError::Corrupt(format!(
2855 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2856 w[0].0
2857 )));
2858 }
2859 }
2860 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2864 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2868 .into_iter()
2869 .map(|(k, body, _)| (k, body))
2870 .collect();
2871 let frozen_rows = seg_rows.len();
2872 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2873 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2874
2875 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2884 let positions: Vec<usize> = (0..max_rows).collect();
2885 let t_mut = self
2886 .get_mut(table_name)
2887 .expect("just validated; still present");
2888 let removed = t_mut.delete_rows(&positions);
2889 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2890 let bytes_after = t_mut.hot_bytes();
2891 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2892
2893 let segment_id = self
2894 .load_segment_bytes(seg_bytes.clone())
2895 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2896 let new_cold = post_swap_keys.into_iter().map(|k| {
2897 (
2898 k,
2899 RowLocator::Cold {
2900 segment_id,
2901 page_offset: 0,
2902 },
2903 )
2904 });
2905 let t_mut = self.get_mut(table_name).expect("still present");
2906 t_mut.register_cold_locators(index_name, new_cold)?;
2907
2908 Ok(FreezeReport {
2909 segment_id,
2910 frozen_rows,
2911 bytes_freed,
2912 segment_bytes: seg_bytes,
2913 })
2914 }
2915
2916 #[must_use]
2922 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
2923 self.cold_segments
2924 .get(segment_id as usize)
2925 .and_then(|s| s.as_deref())
2926 }
2927
2928 pub fn resolve_cold_locator(
2937 &self,
2938 table_name: &str,
2939 segment_id: u32,
2940 key: &IndexKey,
2941 ) -> Option<Row> {
2942 let t = self.get(table_name)?;
2943 let u64_key = index_key_as_u64(key)?;
2944 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
2945 let payload = seg.lookup(u64_key)?;
2946 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
2947 Some(row)
2948 }
2949
2950 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
2968 let t = self.get(table)?;
2969 let idx = t.indices.iter().find(|i| i.name == index_name)?;
2970 let locators = idx.lookup_eq(key);
2971 let cold_u64_key = index_key_as_u64(key);
2972 for loc in locators {
2973 match *loc {
2974 RowLocator::Hot(i) => {
2975 if let Some(row) = t.rows.get(i) {
2976 return Some(row.clone());
2977 }
2978 }
2979 RowLocator::Cold {
2980 segment_id,
2981 page_offset: _,
2982 } => {
2983 let Some(u64_key) = cold_u64_key else {
2984 continue;
2987 };
2988 let Some(seg) = self
2989 .cold_segments
2990 .get(segment_id as usize)
2991 .and_then(|s| s.as_deref())
2992 else {
2993 continue;
3004 };
3005 let Some(payload) = seg.lookup(u64_key) else {
3006 continue;
3007 };
3008 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3009 return Some(row);
3010 }
3011 }
3012 }
3013 None
3014 }
3015
3016 pub fn promote_cold_row(
3038 &mut self,
3039 table_name: &str,
3040 index_name: &str,
3041 key: &IndexKey,
3042 ) -> Result<Option<usize>, StorageError> {
3043 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3044 let Some((segment_id, _page_offset)) = cold_loc else {
3045 return Ok(None);
3046 };
3047 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3048 StorageError::Corrupt(
3049 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3050 .into(),
3051 )
3052 })?;
3053 let schema = self
3057 .get(table_name)
3058 .ok_or_else(|| {
3059 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3060 })?
3061 .schema
3062 .clone();
3063 let seg = self
3064 .cold_segments
3065 .get(segment_id as usize)
3066 .and_then(|s| s.as_ref())
3067 .ok_or_else(|| {
3068 StorageError::Corrupt(format!(
3069 "promote_cold_row: segment {segment_id} not registered on catalog"
3070 ))
3071 })?;
3072 let payload = seg.lookup(u64_key).ok_or_else(|| {
3073 StorageError::Corrupt(format!(
3074 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3075 but the segment's bloom/page lookup didn't return a row"
3076 ))
3077 })?;
3078 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3079 let t = self
3084 .get_mut(table_name)
3085 .expect("table existed at lookup time");
3086 t.insert(row)?;
3087 let new_hot_idx =
3088 t.rows.len().checked_sub(1).ok_or_else(|| {
3089 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3090 })?;
3091 t.remove_cold_locators_for_key(index_name, key)?;
3095 Ok(Some(new_hot_idx))
3096 }
3097
3098 pub fn shadow_cold_row(
3116 &mut self,
3117 table_name: &str,
3118 index_name: &str,
3119 key: &IndexKey,
3120 ) -> Result<usize, StorageError> {
3121 let t = self.get_mut(table_name).ok_or_else(|| {
3122 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3123 })?;
3124 t.remove_cold_locators_for_key(index_name, key)
3125 }
3126
3127 pub fn prepare_freeze_slice(
3145 &self,
3146 table_name: &str,
3147 index_name: &str,
3148 row_range: core::ops::Range<usize>,
3149 ) -> Result<FreezeSlice, StorageError> {
3150 let table = self.get(table_name).ok_or_else(|| {
3151 StorageError::Corrupt(format!(
3152 "prepare_freeze_slice: table {table_name:?} not found"
3153 ))
3154 })?;
3155 let idx = table
3156 .indices
3157 .iter()
3158 .find(|i| i.name == index_name)
3159 .ok_or_else(|| {
3160 StorageError::Corrupt(format!(
3161 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3162 ))
3163 })?;
3164 if !matches!(idx.kind, IndexKind::BTree(_)) {
3165 return Err(StorageError::Corrupt(format!(
3166 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3167 )));
3168 }
3169 if row_range.end > table.rows.len() {
3170 return Err(StorageError::Corrupt(format!(
3171 "prepare_freeze_slice: row_range end {} > row_count {}",
3172 row_range.end,
3173 table.rows.len()
3174 )));
3175 }
3176 let column_position = idx.column_position;
3177 let schema = table.schema.clone();
3178 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3179 for row_idx in row_range.clone() {
3180 let row = table.rows.get(row_idx).expect("bounds-checked above");
3181 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3182 StorageError::Corrupt(format!(
3183 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3184 ))
3185 })?;
3186 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3187 StorageError::Corrupt(format!(
3188 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3189 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3190 ))
3191 })?;
3192 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3193 }
3194 rows.sort_by_key(|(k, _, _)| *k);
3195 Ok(FreezeSlice { row_range, rows })
3196 }
3197
3198 pub fn commit_freeze_slices(
3212 &mut self,
3213 table_name: &str,
3214 index_name: &str,
3215 slices: Vec<FreezeSlice>,
3216 ) -> Result<FreezeReport, StorageError> {
3217 let table = self.get(table_name).ok_or_else(|| {
3219 StorageError::Corrupt(format!(
3220 "commit_freeze_slices: table {table_name:?} not found"
3221 ))
3222 })?;
3223 let idx = table
3224 .indices
3225 .iter()
3226 .find(|i| i.name == index_name)
3227 .ok_or_else(|| {
3228 StorageError::Corrupt(format!(
3229 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3230 ))
3231 })?;
3232 if !matches!(idx.kind, IndexKind::BTree(_)) {
3233 return Err(StorageError::Corrupt(format!(
3234 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3235 )));
3236 }
3237 let mut ordered = slices;
3241 ordered.sort_by_key(|s| s.row_range.start);
3242 let mut expected_start = 0usize;
3246 for s in &ordered {
3247 if s.row_range.start != expected_start {
3248 return Err(StorageError::Corrupt(format!(
3249 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3250 s.row_range.start, expected_start
3251 )));
3252 }
3253 expected_start = s.row_range.end;
3254 }
3255 let max_rows = expected_start;
3256 if max_rows > table.rows.len() {
3257 return Err(StorageError::Corrupt(format!(
3258 "commit_freeze_slices: total row range {} exceeds row_count {}",
3259 max_rows,
3260 table.rows.len()
3261 )));
3262 }
3263 if max_rows == 0 {
3264 return Ok(FreezeReport {
3265 segment_id: u32::MAX,
3266 frozen_rows: 0,
3267 bytes_freed: 0,
3268 segment_bytes: Vec::new(),
3269 });
3270 }
3271
3272 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3277 if total_rows != max_rows {
3278 return Err(StorageError::Corrupt(format!(
3279 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3280 )));
3281 }
3282 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3283 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3284 loop {
3285 let mut pick: Option<usize> = None;
3288 for (i, c) in cursors.iter().enumerate() {
3289 let slice = &ordered[i];
3290 if *c >= slice.rows.len() {
3291 continue;
3292 }
3293 match pick {
3294 None => pick = Some(i),
3295 Some(j) => {
3296 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3297 pick = Some(i);
3298 }
3299 }
3300 }
3301 }
3302 let Some(i) = pick else { break };
3303 let row = ordered[i].rows[cursors[i]].clone();
3304 cursors[i] += 1;
3305 merged.push(row);
3306 }
3307 for w in merged.windows(2) {
3310 if w[0].0 == w[1].0 {
3311 return Err(StorageError::Corrupt(format!(
3312 "commit_freeze_slices: duplicate PK {} across slices",
3313 w[0].0
3314 )));
3315 }
3316 }
3317 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3318 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3319 .into_iter()
3320 .map(|(k, body, _)| (k, body))
3321 .collect();
3322 let frozen_rows = seg_rows.len();
3323 let (seg_bytes, _meta) =
3324 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3325 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3326 })?;
3327
3328 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3330 let positions: Vec<usize> = (0..max_rows).collect();
3331 let t_mut = self
3332 .get_mut(table_name)
3333 .expect("just validated; still present");
3334 let removed = t_mut.delete_rows(&positions);
3335 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3336 let bytes_after = t_mut.hot_bytes();
3337 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3338
3339 let segment_id = self
3340 .load_segment_bytes(seg_bytes.clone())
3341 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3342 let new_cold = post_swap_keys.into_iter().map(|k| {
3343 (
3344 k,
3345 RowLocator::Cold {
3346 segment_id,
3347 page_offset: 0,
3348 },
3349 )
3350 });
3351 let t_mut = self.get_mut(table_name).expect("still present");
3352 t_mut.register_cold_locators(index_name, new_cold)?;
3353
3354 Ok(FreezeReport {
3355 segment_id,
3356 frozen_rows,
3357 bytes_freed,
3358 segment_bytes: seg_bytes,
3359 })
3360 }
3361
3362 pub fn compact_cold_segments(
3405 &mut self,
3406 table_name: &str,
3407 index_name: &str,
3408 target_segment_bytes: u64,
3409 ) -> Result<CompactReport, StorageError> {
3410 let t = self.get(table_name).ok_or_else(|| {
3412 StorageError::Corrupt(format!(
3413 "compact_cold_segments: table {table_name:?} not found"
3414 ))
3415 })?;
3416 let idx = t
3417 .indices
3418 .iter()
3419 .find(|i| i.name == index_name)
3420 .ok_or_else(|| {
3421 StorageError::Corrupt(format!(
3422 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3423 ))
3424 })?;
3425 let map = match &idx.kind {
3426 IndexKind::BTree(m) => m,
3427 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3428 return Err(StorageError::Corrupt(format!(
3429 "compact_cold_segments: index {index_name:?} is not BTree; \
3430 compaction applies only to BTree cold-tier indices"
3431 )));
3432 }
3433 };
3434
3435 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3438 for (_key, locators) in map.iter() {
3439 for loc in locators {
3440 if let RowLocator::Cold { segment_id, .. } = loc {
3441 referenced_ids.insert(*segment_id);
3442 }
3443 }
3444 }
3445 let candidate_set: BTreeSet<u32> = referenced_ids
3447 .into_iter()
3448 .filter(|id| {
3449 self.cold_segments
3450 .get(*id as usize)
3451 .and_then(|s| s.as_deref())
3452 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3453 })
3454 .collect();
3455 if candidate_set.len() < 2 {
3456 return Ok(CompactReport {
3457 sources: Vec::new(),
3458 merged_segment_id: None,
3459 merged_segment_bytes: Vec::new(),
3460 merged_rows: 0,
3461 deleted_rows_pruned: 0,
3462 bytes_reclaimed_estimate: 0,
3463 });
3464 }
3465 let mut source_row_count: usize = 0;
3467 let mut source_byte_total: u64 = 0;
3468 for &id in &candidate_set {
3469 let seg = self.cold_segments[id as usize]
3470 .as_ref()
3471 .expect("candidate selected only when slot is Some");
3472 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3473 source_byte_total =
3474 source_byte_total.saturating_add(seg.bytes().len() as u64);
3475 }
3476 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3482 for (key, locators) in map.iter() {
3483 for loc in locators {
3484 let RowLocator::Cold { segment_id, .. } = loc else {
3485 continue;
3486 };
3487 if !candidate_set.contains(segment_id) {
3488 continue;
3489 }
3490 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3491 StorageError::Corrupt(format!(
3492 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3493 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3494 ))
3495 })?;
3496 let seg = self.cold_segments[*segment_id as usize]
3497 .as_ref()
3498 .expect("candidate slot guaranteed Some above");
3499 let payload = seg.lookup(u64_key).ok_or_else(|| {
3500 StorageError::Corrupt(format!(
3501 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3502 at segment {segment_id} but the segment lookup missed"
3503 ))
3504 })?;
3505 collected.insert(u64_key, (payload, key.clone()));
3506 break;
3507 }
3508 }
3509 let merged_rows = collected.len();
3510 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3511
3512 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3516 .iter()
3517 .map(|(k, (body, _))| (*k, body.clone()))
3518 .collect();
3519 let (seg_bytes, _meta) =
3520 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3521 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3522 })?;
3523 let merged_bytes_len = seg_bytes.len() as u64;
3524
3525 let merged_segment_id = self
3527 .load_segment_bytes(seg_bytes.clone())
3528 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3529
3530 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3536 let t = self
3537 .get(table_name)
3538 .expect("table existed at the start of this fn");
3539 let idx = t
3540 .indices
3541 .iter()
3542 .find(|i| i.name == index_name)
3543 .expect("index existed at the start of this fn");
3544 let IndexKind::BTree(map) = &idx.kind else {
3545 unreachable!("validated above");
3546 };
3547 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3548 };
3549 let t_mut = self
3550 .get_mut(table_name)
3551 .expect("table existed at the start of this fn");
3552 let idx_mut = t_mut
3553 .indices
3554 .iter_mut()
3555 .find(|i| i.name == index_name)
3556 .expect("index existed at the start of this fn");
3557 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3558 unreachable!("validated above");
3559 };
3560 for (key, locators) in entries {
3561 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3562 let mut changed = false;
3563 for loc in &locators {
3564 match *loc {
3565 RowLocator::Cold {
3566 segment_id,
3567 page_offset: _,
3568 } if candidate_set.contains(&segment_id) => {
3569 let replacement = RowLocator::Cold {
3570 segment_id: merged_segment_id,
3571 page_offset: 0,
3572 };
3573 if !new_locs.contains(&replacement) {
3574 new_locs.push(replacement);
3575 }
3576 changed = true;
3577 }
3578 other => new_locs.push(other),
3579 }
3580 }
3581 if changed {
3582 map_mut.insert_mut(key, new_locs);
3583 }
3584 }
3585
3586 for &id in &candidate_set {
3591 self.tombstone_segment(id)?;
3592 }
3593
3594 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3595 Ok(CompactReport {
3596 sources: candidate_set.into_iter().collect(),
3597 merged_segment_id: Some(merged_segment_id),
3598 merged_segment_bytes: seg_bytes,
3599 merged_rows,
3600 deleted_rows_pruned,
3601 bytes_reclaimed_estimate,
3602 })
3603 }
3604
3605 fn find_cold_locator(
3611 &self,
3612 table_name: &str,
3613 index_name: &str,
3614 key: &IndexKey,
3615 ) -> Result<Option<(u32, u32)>, StorageError> {
3616 let t = self.get(table_name).ok_or_else(|| {
3617 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3618 })?;
3619 let idx = t
3620 .indices
3621 .iter()
3622 .find(|i| i.name == index_name)
3623 .ok_or_else(|| {
3624 StorageError::Corrupt(format!(
3625 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3626 ))
3627 })?;
3628 if !matches!(idx.kind, IndexKind::BTree(_)) {
3629 return Err(StorageError::Corrupt(format!(
3630 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3631 )));
3632 }
3633 for loc in idx.lookup_eq(key) {
3634 if let RowLocator::Cold {
3635 segment_id,
3636 page_offset,
3637 } = *loc
3638 {
3639 return Ok(Some((segment_id, page_offset)));
3640 }
3641 }
3642 Ok(None)
3643 }
3644}
3645
3646fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3652 match key {
3653 IndexKey::Int(n) => Some(n.cast_unsigned()),
3659 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3660 }
3661}
3662
3663#[derive(Debug, Clone, PartialEq, Eq)]
3664#[non_exhaustive]
3665pub enum StorageError {
3666 DuplicateTable {
3667 name: String,
3668 },
3669 TableNotFound {
3670 name: String,
3671 },
3672 ArityMismatch {
3673 expected: usize,
3674 actual: usize,
3675 },
3676 TypeMismatch {
3677 column: String,
3678 expected: DataType,
3679 actual: DataType,
3680 position: usize,
3681 },
3682 NullInNotNull {
3683 column: String,
3684 },
3685 DuplicateIndex {
3687 name: String,
3688 },
3689 ColumnNotFound {
3691 column: String,
3692 },
3693 Corrupt(String),
3696 IndexNotFound {
3699 name: String,
3700 },
3701 Unsupported(String),
3705}
3706
3707impl fmt::Display for StorageError {
3708 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3709 match self {
3710 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3711 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3712 Self::ArityMismatch { expected, actual } => write!(
3713 f,
3714 "row arity mismatch: expected {expected} columns, got {actual}"
3715 ),
3716 Self::TypeMismatch {
3717 column,
3718 expected,
3719 actual,
3720 position,
3721 } => write!(
3722 f,
3723 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3724 ),
3725 Self::NullInNotNull { column } => {
3726 write!(f, "NULL value in NOT NULL column {column:?}")
3727 }
3728 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3729 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3730 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3731 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3732 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3733 }
3734 }
3735}
3736
3737impl ColumnSchema {
3738 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3739 Self {
3740 name: name.into(),
3741 ty,
3742 nullable,
3743 default: None,
3744 auto_increment: false,
3745 }
3746 }
3747
3748 #[must_use]
3752 pub fn with_default(mut self, default: Value) -> Self {
3753 self.default = Some(default);
3754 self
3755 }
3756
3757 #[must_use]
3759 pub const fn with_auto_increment(mut self) -> Self {
3760 self.auto_increment = true;
3761 self
3762 }
3763}
3764
3765impl TableSchema {
3766 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3767 Self {
3768 name: name.into(),
3769 columns,
3770 hot_tier_bytes: None,
3771 foreign_keys: Vec::new(),
3772 }
3773 }
3774}
3775
3776const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3824const FILE_VERSION: u8 = 14;
3850const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3853
3854const INDEX_KEY_TAG_INT: u8 = 0;
3859const INDEX_KEY_TAG_TEXT: u8 = 1;
3860const INDEX_KEY_TAG_BOOL: u8 = 2;
3861
3862impl Catalog {
3863 pub fn serialize(&self) -> Vec<u8> {
3866 let mut out = Vec::with_capacity(64);
3867 out.extend_from_slice(FILE_MAGIC);
3868 out.push(FILE_VERSION);
3869 write_u32(
3870 &mut out,
3871 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3872 );
3873 for t in &self.tables {
3874 write_str(&mut out, &t.schema.name);
3875 write_u16(
3876 &mut out,
3877 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3878 );
3879 for c in &t.schema.columns {
3880 write_str(&mut out, &c.name);
3881 write_data_type(&mut out, c.ty);
3882 out.push(u8::from(c.nullable));
3883 match &c.default {
3884 None => out.push(0),
3885 Some(v) => {
3886 out.push(1);
3887 write_value(&mut out, v);
3888 }
3889 }
3890 out.push(u8::from(c.auto_increment));
3891 }
3892 write_u32(
3893 &mut out,
3894 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
3895 );
3896 for row in &t.rows {
3901 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
3902 }
3903 write_u16(
3910 &mut out,
3911 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
3912 );
3913 for idx in &t.indices {
3914 write_str(&mut out, &idx.name);
3915 write_u16(
3916 &mut out,
3917 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
3918 );
3919 match &idx.kind {
3920 IndexKind::BTree(map) => {
3921 out.push(0);
3922 write_u32(
3930 &mut out,
3931 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
3932 );
3933 for (key, locators) in map {
3934 write_index_key(&mut out, key);
3935 write_u32(
3936 &mut out,
3937 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
3938 );
3939 for loc in locators {
3940 loc.write_le(&mut out);
3941 }
3942 }
3943 }
3944 IndexKind::Nsw(g) => {
3945 out.push(1);
3946 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
3947 write_nsw_graph(&mut out, g);
3948 }
3949 IndexKind::Brin { column_type } => {
3950 out.push(2);
3956 write_data_type(&mut out, *column_type);
3957 }
3958 }
3959 write_u16(
3965 &mut out,
3966 u16::try_from(idx.included_columns.len())
3967 .expect("≤ 65k INCLUDE columns/index"),
3968 );
3969 for col_pos in &idx.included_columns {
3970 write_u16(
3971 &mut out,
3972 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
3973 );
3974 }
3975 match &idx.partial_predicate {
3979 None => out.push(0),
3980 Some(pred) => {
3981 out.push(1);
3982 write_str(&mut out, pred);
3983 }
3984 }
3985 match &idx.expression {
3988 None => out.push(0),
3989 Some(expr) => {
3990 out.push(1);
3991 write_str(&mut out, expr);
3992 }
3993 }
3994 }
3995 match t.schema.hot_tier_bytes {
4001 None => out.push(0),
4002 Some(n) => {
4003 out.push(1);
4004 out.extend_from_slice(&n.to_le_bytes());
4005 }
4006 }
4007 write_u16(
4018 &mut out,
4019 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4020 );
4021 for fk in &t.schema.foreign_keys {
4022 match &fk.name {
4023 None => out.push(0),
4024 Some(n) => {
4025 out.push(1);
4026 write_str(&mut out, n);
4027 }
4028 }
4029 write_u16(
4030 &mut out,
4031 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4032 );
4033 for &p in &fk.local_columns {
4034 write_u16(
4035 &mut out,
4036 u16::try_from(p).expect("≤ 65k columns/table"),
4037 );
4038 }
4039 write_str(&mut out, &fk.parent_table);
4040 write_u16(
4041 &mut out,
4042 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4043 );
4044 for &p in &fk.parent_columns {
4045 write_u16(
4046 &mut out,
4047 u16::try_from(p).expect("≤ 65k columns/table"),
4048 );
4049 }
4050 out.push(fk.on_delete.tag());
4051 out.push(fk.on_update.tag());
4052 }
4053 }
4054 out
4055 }
4056
4057 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4060 let mut cur = Cursor::new(buf);
4061 let magic = cur.take(8)?;
4062 if magic != FILE_MAGIC {
4063 return Err(StorageError::Corrupt(format!(
4064 "bad magic: expected SPGDB001, got {magic:?}"
4065 )));
4066 }
4067 let version = cur.read_u8()?;
4068 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4069 return Err(StorageError::Corrupt(format!(
4070 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4071 )));
4072 }
4073 let table_count = cur.read_u32()? as usize;
4074 let mut cat = Self::new();
4075 for _ in 0..table_count {
4076 deserialize_table(&mut cur, &mut cat, version)?;
4077 }
4078 if cur.pos < buf.len() {
4079 return Err(StorageError::Corrupt(format!(
4080 "trailing bytes: {} unread",
4081 buf.len() - cur.pos
4082 )));
4083 }
4084 Ok(cat)
4085 }
4086}
4087
4088fn deserialize_table(
4093 cur: &mut Cursor<'_>,
4094 cat: &mut Catalog,
4095 version: u8,
4096) -> Result<(), StorageError> {
4097 let table_name = cur.read_str()?;
4098 let name = table_name.clone();
4099 let col_count = cur.read_u16()? as usize;
4100 let mut cols = Vec::with_capacity(col_count);
4101 for _ in 0..col_count {
4102 let c_name = cur.read_str()?;
4103 let ty = cur.read_data_type()?;
4104 let nullable = cur.read_u8()? != 0;
4105 let default = match cur.read_u8()? {
4106 0 => None,
4107 1 => Some(cur.read_value()?),
4108 other => {
4109 return Err(StorageError::Corrupt(format!(
4110 "unknown default tag: {other}"
4111 )));
4112 }
4113 };
4114 let auto_increment = cur.read_u8()? != 0;
4115 cols.push(ColumnSchema {
4116 name: c_name,
4117 ty,
4118 nullable,
4119 default,
4120 auto_increment,
4121 });
4122 }
4123 let n_cols = cols.len();
4124 cat.create_table(TableSchema::new(name, cols))?;
4125 let t = cat.tables.last_mut().expect("create_table just pushed");
4129 deserialize_rows(cur, t, n_cols)?;
4130 deserialize_indices(cur, t, version)?;
4131 if version >= 11 {
4137 let has = cur.read_u8()?;
4138 let hot_tier_bytes = match has {
4139 0 => None,
4140 1 => Some(cur.read_u64()?),
4141 other => {
4142 return Err(StorageError::Corrupt(format!(
4143 "hot_tier_bytes appendix: unknown has-value byte {other}"
4144 )));
4145 }
4146 };
4147 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4148 }
4149 if version >= 13 {
4152 let fk_count = cur.read_u16()? as usize;
4153 let mut fks = Vec::with_capacity(fk_count);
4154 for _ in 0..fk_count {
4155 let name = match cur.read_u8()? {
4156 0 => None,
4157 1 => Some(cur.read_str()?),
4158 other => {
4159 return Err(StorageError::Corrupt(format!(
4160 "FK appendix: unknown has-name byte {other}"
4161 )));
4162 }
4163 };
4164 let local_arity = cur.read_u16()? as usize;
4165 let mut local_columns = Vec::with_capacity(local_arity);
4166 for _ in 0..local_arity {
4167 local_columns.push(cur.read_u16()? as usize);
4168 }
4169 let parent_table = cur.read_str()?;
4170 let parent_arity = cur.read_u16()? as usize;
4171 if parent_arity != local_arity {
4172 return Err(StorageError::Corrupt(format!(
4173 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4174 )));
4175 }
4176 let mut parent_columns = Vec::with_capacity(parent_arity);
4177 for _ in 0..parent_arity {
4178 parent_columns.push(cur.read_u16()? as usize);
4179 }
4180 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4181 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4182 })?;
4183 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4184 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4185 })?;
4186 fks.push(ForeignKeyConstraint {
4187 name,
4188 local_columns,
4189 parent_table,
4190 parent_columns,
4191 on_delete,
4192 on_update,
4193 });
4194 }
4195 t.schema_mut().foreign_keys = fks;
4196 }
4197 let _ = table_name;
4198 Ok(())
4199}
4200
4201fn deserialize_rows(
4202 cur: &mut Cursor<'_>,
4203 t: &mut Table,
4204 _n_cols: usize,
4205) -> Result<(), StorageError> {
4206 let row_count = cur.read_u32()? as usize;
4207 let mut hot_bytes: u64 = 0;
4212 for _ in 0..row_count {
4213 let tail = &cur.buf[cur.pos..];
4214 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4215 cur.pos += consumed;
4216 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4222 t.rows.push_mut(row);
4223 }
4224 t.hot_bytes = hot_bytes;
4225 Ok(())
4226}
4227
4228fn deserialize_indices(
4229 cur: &mut Cursor<'_>,
4230 t: &mut Table,
4231 version: u8,
4232) -> Result<(), StorageError> {
4233 let index_count = cur.read_u16()? as usize;
4234 for _ in 0..index_count {
4235 let idx_name = cur.read_str()?;
4236 let col_pos = cur.read_u16()? as usize;
4237 let column_name = t
4238 .schema
4239 .columns
4240 .get(col_pos)
4241 .ok_or_else(|| {
4242 StorageError::Corrupt(format!(
4243 "index {idx_name:?} points at non-existent column position {col_pos}"
4244 ))
4245 })?
4246 .name
4247 .clone();
4248 let kind_tag = cur.read_u8()?;
4249 match kind_tag {
4250 0 => {
4251 if version >= 9 {
4252 let map = read_btree_map(cur)?;
4257 t.restore_btree_index(idx_name, &column_name, map)?;
4258 } else {
4259 t.add_index(idx_name, &column_name)?;
4264 }
4265 }
4266 1 => {
4267 let m = cur.read_u16()? as usize;
4268 let graph = cur.read_nsw_graph(m)?;
4269 t.restore_nsw_index(idx_name, &column_name, graph)?;
4270 }
4271 2 => {
4272 let column_type = cur.read_data_type()?;
4276 t.restore_brin_index(idx_name, &column_name, column_type)?;
4277 }
4278 other => {
4279 return Err(StorageError::Corrupt(format!(
4280 "unknown index kind tag: {other}"
4281 )));
4282 }
4283 }
4284 if version >= 12 {
4287 let num_included = cur.read_u16()? as usize;
4288 if num_included > 0 {
4289 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4290 for _ in 0..num_included {
4291 let cp = cur.read_u16()? as usize;
4292 if cp >= t.schema.columns.len() {
4293 return Err(StorageError::Corrupt(format!(
4294 "INCLUDE column position {cp} out of range \
4295 ({} schema columns)",
4296 t.schema.columns.len()
4297 )));
4298 }
4299 included.push(cp);
4300 }
4301 if let Some(last) = t.indices.last_mut() {
4302 last.included_columns = included;
4303 }
4304 }
4305 match cur.read_u8()? {
4307 0 => {}
4308 1 => {
4309 let pred = cur.read_str()?;
4310 if let Some(last) = t.indices.last_mut() {
4311 last.partial_predicate = Some(pred);
4312 }
4313 }
4314 other => {
4315 return Err(StorageError::Corrupt(format!(
4316 "partial_predicate tag: unknown byte {other}"
4317 )));
4318 }
4319 }
4320 match cur.read_u8()? {
4322 0 => {}
4323 1 => {
4324 let expr = cur.read_str()?;
4325 if let Some(last) = t.indices.last_mut() {
4326 last.expression = Some(expr);
4327 }
4328 }
4329 other => {
4330 return Err(StorageError::Corrupt(format!(
4331 "expression tag: unknown byte {other}"
4332 )));
4333 }
4334 }
4335 }
4336 }
4337 Ok(())
4338}
4339
4340fn read_btree_map(
4344 cur: &mut Cursor<'_>,
4345) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4346 let entry_count = cur.read_u32()? as usize;
4347 let mut map = PersistentBTreeMap::new();
4348 for _ in 0..entry_count {
4349 let key = cur.read_index_key()?;
4350 let locator_count = cur.read_u32()? as usize;
4351 let mut locators = Vec::with_capacity(locator_count);
4352 for _ in 0..locator_count {
4353 let tail = &cur.buf[cur.pos..];
4354 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4355 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4356 })?;
4357 cur.pos += consumed;
4358 locators.push(loc);
4359 }
4360 map.insert_mut(key, locators);
4361 }
4362 Ok(map)
4363}
4364
4365fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4381 let entry = g.entry.map_or(u32::MAX, |e| {
4382 u32::try_from(e).expect("NSW entry fits in u32")
4383 });
4384 write_u16(
4385 out,
4386 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4387 );
4388 out.extend_from_slice(&entry.to_le_bytes());
4389 out.push(g.entry_level);
4390 let node_count = g.levels.len();
4391 write_u32(
4392 out,
4393 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4394 );
4395 for &lvl in &g.levels {
4396 out.push(lvl);
4397 }
4398 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4399 out.push(layer_count);
4400 for layer in &g.layers {
4401 write_u32(
4402 out,
4403 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4404 );
4405 for neighbors in layer {
4406 write_u16(
4407 out,
4408 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4409 );
4410 for &peer in neighbors {
4414 write_u32(out, peer);
4415 }
4416 }
4417 }
4418}
4419
4420fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4421 match t {
4422 DataType::Int => out.push(1),
4423 DataType::BigInt => out.push(2),
4424 DataType::Float => out.push(3),
4425 DataType::Text => out.push(4),
4426 DataType::Bool => out.push(5),
4427 DataType::Vector { dim, encoding } => match encoding {
4428 VecEncoding::F32 => {
4432 out.push(6);
4433 out.extend_from_slice(&dim.to_le_bytes());
4434 }
4435 VecEncoding::F16 => {
4438 out.push(15);
4439 out.extend_from_slice(&dim.to_le_bytes());
4440 }
4441 VecEncoding::Sq8 => {
4447 out.push(14);
4448 out.extend_from_slice(&dim.to_le_bytes());
4449 }
4450 },
4451 DataType::SmallInt => out.push(7),
4452 DataType::Varchar(max) => {
4453 out.push(8);
4454 out.extend_from_slice(&max.to_le_bytes());
4455 }
4456 DataType::Char(size) => {
4457 out.push(9);
4458 out.extend_from_slice(&size.to_le_bytes());
4459 }
4460 DataType::Numeric { precision, scale } => {
4461 out.push(10);
4462 out.push(precision);
4463 out.push(scale);
4464 }
4465 DataType::Date => out.push(11),
4466 DataType::Timestamp => out.push(12),
4467 DataType::Timestamptz => out.push(17),
4471 DataType::Interval => {
4476 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4477 }
4478 DataType::Json => out.push(13),
4479 DataType::Jsonb => out.push(16),
4482 }
4483}
4484
4485impl Cursor<'_> {
4486 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4487 let tag = self.read_u8()?;
4488 match tag {
4489 1 => Ok(DataType::Int),
4490 2 => Ok(DataType::BigInt),
4491 3 => Ok(DataType::Float),
4492 4 => Ok(DataType::Text),
4493 5 => Ok(DataType::Bool),
4494 6 => Ok(DataType::Vector {
4495 dim: self.read_u32()?,
4496 encoding: VecEncoding::F32,
4497 }),
4498 7 => Ok(DataType::SmallInt),
4499 8 => Ok(DataType::Varchar(self.read_u32()?)),
4500 9 => Ok(DataType::Char(self.read_u32()?)),
4501 10 => {
4502 let precision = self.read_u8()?;
4503 let scale = self.read_u8()?;
4504 Ok(DataType::Numeric { precision, scale })
4505 }
4506 11 => Ok(DataType::Date),
4507 12 => Ok(DataType::Timestamp),
4508 13 => Ok(DataType::Json),
4509 14 => Ok(DataType::Vector {
4510 dim: self.read_u32()?,
4511 encoding: VecEncoding::Sq8,
4512 }),
4513 15 => Ok(DataType::Vector {
4517 dim: self.read_u32()?,
4518 encoding: VecEncoding::F16,
4519 }),
4520 16 => Ok(DataType::Jsonb),
4524 17 => Ok(DataType::Timestamptz),
4528 other => Err(StorageError::Corrupt(format!(
4529 "unknown data type tag: {other}"
4530 ))),
4531 }
4532 }
4533}
4534
4535pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4541 debug_assert_eq!(
4542 row.values.len(),
4543 schema.columns.len(),
4544 "row_body_encoded_len: row arity must match schema"
4545 );
4546 let bitmap_bytes = schema.columns.len().div_ceil(8);
4547 let mut n = bitmap_bytes;
4548 for (col_idx, v) in row.values.iter().enumerate() {
4549 if matches!(v, Value::Null) {
4550 continue;
4551 }
4552 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4553 }
4554 n
4555}
4556
4557fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4563 match v {
4564 Value::SmallInt(_) => 2,
4565 Value::Int(_) | Value::Date(_) => 4,
4567 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4569 Value::Bool(_) => 1,
4570 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4572 Value::Vector(vec) => 4 + 4 * vec.len(),
4574 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4581 Value::HalfVector(h) => 4 + h.bytes.len(),
4584 Value::Numeric { .. } => 16 + 1,
4586 Value::Null => 0,
4588 Value::Interval { .. } => {
4590 unreachable!("Value::Interval has no on-disk encoding")
4591 }
4592 }
4593}
4594
4595pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4606 debug_assert_eq!(
4607 row.values.len(),
4608 schema.columns.len(),
4609 "dense encode: row arity must match schema"
4610 );
4611 let bitmap_bytes = schema.columns.len().div_ceil(8);
4612 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4615 let bitmap_offset = out.len();
4616 out.resize(bitmap_offset + bitmap_bytes, 0);
4617 for (i, v) in row.values.iter().enumerate() {
4618 if matches!(v, Value::Null) {
4619 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4620 }
4621 }
4622 for (col_idx, v) in row.values.iter().enumerate() {
4623 if matches!(v, Value::Null) {
4624 continue;
4625 }
4626 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4627 }
4628 out
4629}
4630
4631pub fn decode_row_body_dense(
4637 bytes: &[u8],
4638 schema: &TableSchema,
4639) -> Result<(Row, usize), StorageError> {
4640 let mut cur = Cursor::new(bytes);
4641 let bitmap_bytes = schema.columns.len().div_ceil(8);
4642 let mut bitmap_buf = [0u8; 32];
4643 if bitmap_bytes > bitmap_buf.len() {
4644 return Err(StorageError::Corrupt(format!(
4645 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4646 )));
4647 }
4648 let slice = cur.take(bitmap_bytes)?;
4649 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4650 let mut values = Vec::with_capacity(schema.columns.len());
4651 for (col_idx, col) in schema.columns.iter().enumerate() {
4652 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4653 values.push(Value::Null);
4654 } else {
4655 values.push(cur.read_value_body(col.ty)?);
4656 }
4657 }
4658 Ok((Row { values }, cur.pos))
4659}
4660
4661fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4670 match (v, ty) {
4671 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4672 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4673 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4674 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4675 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4676 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4677 write_str(out, s);
4678 }
4679 (
4680 Value::Vector(v),
4681 DataType::Vector {
4682 encoding: VecEncoding::F32,
4683 ..
4684 },
4685 ) => {
4686 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4687 out.extend_from_slice(&dim.to_le_bytes());
4688 for x in v {
4689 out.extend_from_slice(&x.to_le_bytes());
4690 }
4691 }
4692 (
4698 Value::Sq8Vector(q),
4699 DataType::Vector {
4700 encoding: VecEncoding::Sq8,
4701 ..
4702 },
4703 ) => {
4704 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4705 out.extend_from_slice(&dim.to_le_bytes());
4706 out.extend_from_slice(&q.min.to_le_bytes());
4707 out.extend_from_slice(&q.max.to_le_bytes());
4708 out.extend_from_slice(&q.bytes);
4709 }
4710 (
4714 Value::HalfVector(h),
4715 DataType::Vector {
4716 encoding: VecEncoding::F16,
4717 ..
4718 },
4719 ) => {
4720 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4721 out.extend_from_slice(&dim.to_le_bytes());
4722 out.extend_from_slice(&h.bytes);
4723 }
4724 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4725 out.extend_from_slice(&scaled.to_le_bytes());
4726 out.push(scale);
4727 }
4728 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4729 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
4730 out.extend_from_slice(&t.to_le_bytes())
4731 }
4732 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
4736 (other, ty) => unreachable!(
4740 "schema-driven encode received mismatched value/type pair: \
4741 value tag={:?}, column type={:?}",
4742 other.data_type(),
4743 ty
4744 ),
4745 }
4746}
4747
4748fn write_value(out: &mut Vec<u8>, v: &Value) {
4749 match v {
4750 Value::Null => out.push(0),
4751 Value::SmallInt(n) => {
4752 out.push(7);
4753 out.extend_from_slice(&n.to_le_bytes());
4754 }
4755 Value::Int(n) => {
4756 out.push(1);
4757 out.extend_from_slice(&n.to_le_bytes());
4758 }
4759 Value::BigInt(n) => {
4760 out.push(2);
4761 out.extend_from_slice(&n.to_le_bytes());
4762 }
4763 Value::Float(x) => {
4764 out.push(3);
4765 out.extend_from_slice(&x.to_le_bytes());
4766 }
4767 Value::Text(s) | Value::Json(s) => {
4772 out.push(4);
4773 write_str(out, s);
4774 }
4775 Value::Bool(b) => {
4776 out.push(5);
4777 out.push(u8::from(*b));
4778 }
4779 Value::Vector(v) => {
4780 out.push(6);
4781 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4782 out.extend_from_slice(&dim.to_le_bytes());
4783 for x in v {
4784 out.extend_from_slice(&x.to_le_bytes());
4785 }
4786 }
4787 Value::Sq8Vector(q) => {
4792 out.push(11);
4793 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4794 out.extend_from_slice(&dim.to_le_bytes());
4795 out.extend_from_slice(&q.min.to_le_bytes());
4796 out.extend_from_slice(&q.max.to_le_bytes());
4797 out.extend_from_slice(&q.bytes);
4798 }
4799 Value::HalfVector(h) => {
4804 out.push(12);
4805 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4806 out.extend_from_slice(&dim.to_le_bytes());
4807 out.extend_from_slice(&h.bytes);
4808 }
4809 Value::Numeric { scaled, scale } => {
4810 out.push(8);
4811 out.extend_from_slice(&scaled.to_le_bytes());
4812 out.push(*scale);
4813 }
4814 Value::Date(d) => {
4815 out.push(9);
4816 out.extend_from_slice(&d.to_le_bytes());
4817 }
4818 Value::Timestamp(t) => {
4819 out.push(10);
4820 out.extend_from_slice(&t.to_le_bytes());
4821 }
4822 Value::Interval { .. } => {
4826 unreachable!(
4827 "Value::Interval has no on-disk encoding; engine must reject it before write"
4828 )
4829 }
4830 }
4831}
4832
4833fn write_u16(out: &mut Vec<u8>, n: u16) {
4834 out.extend_from_slice(&n.to_le_bytes());
4835}
4836fn write_u32(out: &mut Vec<u8>, n: u32) {
4837 out.extend_from_slice(&n.to_le_bytes());
4838}
4839fn write_str(out: &mut Vec<u8>, s: &str) {
4840 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
4841 write_u16(out, len);
4842 out.extend_from_slice(s.as_bytes());
4843}
4844
4845fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
4849 match key {
4850 IndexKey::Int(n) => {
4851 out.push(INDEX_KEY_TAG_INT);
4852 out.extend_from_slice(&n.to_le_bytes());
4853 }
4854 IndexKey::Text(s) => {
4855 out.push(INDEX_KEY_TAG_TEXT);
4856 write_str(out, s);
4857 }
4858 IndexKey::Bool(b) => {
4859 out.push(INDEX_KEY_TAG_BOOL);
4860 out.push(u8::from(*b));
4861 }
4862 }
4863}
4864
4865struct Cursor<'a> {
4866 buf: &'a [u8],
4867 pos: usize,
4868}
4869
4870impl<'a> Cursor<'a> {
4871 const fn new(buf: &'a [u8]) -> Self {
4872 Self { buf, pos: 0 }
4873 }
4874
4875 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
4876 let end = self
4877 .pos
4878 .checked_add(n)
4879 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
4880 if end > self.buf.len() {
4881 return Err(StorageError::Corrupt(format!(
4882 "unexpected EOF at offset {} (wanted {n} more bytes)",
4883 self.pos
4884 )));
4885 }
4886 let s = &self.buf[self.pos..end];
4887 self.pos = end;
4888 Ok(s)
4889 }
4890
4891 fn read_u8(&mut self) -> Result<u8, StorageError> {
4892 Ok(self.take(1)?[0])
4893 }
4894 fn read_u16(&mut self) -> Result<u16, StorageError> {
4895 let s = self.take(2)?;
4896 Ok(u16::from_le_bytes([s[0], s[1]]))
4897 }
4898 fn read_u32(&mut self) -> Result<u32, StorageError> {
4899 let s = self.take(4)?;
4900 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4901 }
4902 fn read_i32(&mut self) -> Result<i32, StorageError> {
4903 let s = self.take(4)?;
4904 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4905 }
4906 fn read_u64(&mut self) -> Result<u64, StorageError> {
4909 let s = self.take(8)?;
4910 Ok(u64::from_le_bytes([
4911 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
4912 ]))
4913 }
4914 fn read_i64(&mut self) -> Result<i64, StorageError> {
4915 let s = self.take(8)?;
4916 let arr: [u8; 8] = s.try_into().expect("checked");
4917 Ok(i64::from_le_bytes(arr))
4918 }
4919 fn read_f64(&mut self) -> Result<f64, StorageError> {
4920 let s = self.take(8)?;
4921 let arr: [u8; 8] = s.try_into().expect("checked");
4922 Ok(f64::from_le_bytes(arr))
4923 }
4924 fn read_f32(&mut self) -> Result<f32, StorageError> {
4925 let s = self.take(4)?;
4926 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
4927 }
4928 fn read_str(&mut self) -> Result<String, StorageError> {
4929 let len = self.read_u16()? as usize;
4930 let bytes = self.take(len)?;
4931 core::str::from_utf8(bytes)
4932 .map(String::from)
4933 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
4934 }
4935
4936 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
4940 let tag = self.read_u8()?;
4941 match tag {
4942 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
4943 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
4944 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
4945 other => Err(StorageError::Corrupt(format!(
4946 "unknown index key tag: {other}"
4947 ))),
4948 }
4949 }
4950 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
4956 match ty {
4957 DataType::SmallInt => {
4958 let s = self.take(2)?;
4959 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
4960 }
4961 DataType::Int => Ok(Value::Int(self.read_i32()?)),
4962 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
4963 DataType::Float => Ok(Value::Float(self.read_f64()?)),
4964 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
4965 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
4966 Ok(Value::Text(self.read_str()?))
4967 }
4968 DataType::Vector {
4969 encoding: VecEncoding::F32,
4970 ..
4971 } => {
4972 let dim = self.read_u32()? as usize;
4973 let mut v = Vec::with_capacity(dim);
4974 for _ in 0..dim {
4975 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
4976 v.push(f32::from_le_bytes(bytes));
4977 }
4978 Ok(Value::Vector(v))
4979 }
4980 DataType::Vector {
4981 encoding: VecEncoding::Sq8,
4982 ..
4983 } => {
4984 let dim = self.read_u32()? as usize;
4985 let min = self.read_f32()?;
4986 let max = self.read_f32()?;
4987 let bytes = self.take(dim)?.to_vec();
4988 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
4989 }
4990 DataType::Vector {
4991 encoding: VecEncoding::F16,
4992 ..
4993 } => {
4994 let dim = self.read_u32()? as usize;
4995 let bytes = self.take(dim * 2)?.to_vec();
4996 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
4997 }
4998 DataType::Numeric { .. } => {
4999 let s = self.take(16)?;
5000 let arr: [u8; 16] = s.try_into().expect("checked");
5001 let scaled = i128::from_le_bytes(arr);
5002 let scale = self.read_u8()?;
5003 Ok(Value::Numeric { scaled, scale })
5004 }
5005 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5006 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5007 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5008 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5009 DataType::Interval => {
5010 Err(StorageError::Corrupt(
5015 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5016 ))
5017 }
5018 DataType::Json => Ok(Value::Json(self.read_str()?)),
5019 }
5020 }
5021
5022 fn read_value(&mut self) -> Result<Value, StorageError> {
5023 let tag = self.read_u8()?;
5024 match tag {
5025 0 => Ok(Value::Null),
5026 1 => Ok(Value::Int(self.read_i32()?)),
5027 2 => Ok(Value::BigInt(self.read_i64()?)),
5028 3 => Ok(Value::Float(self.read_f64()?)),
5029 4 => Ok(Value::Text(self.read_str()?)),
5030 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5031 6 => {
5032 let dim = self.read_u32()? as usize;
5033 let mut v = Vec::with_capacity(dim);
5034 for _ in 0..dim {
5035 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5036 v.push(f32::from_le_bytes(bytes));
5037 }
5038 Ok(Value::Vector(v))
5039 }
5040 7 => {
5041 let s = self.take(2)?;
5042 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5043 }
5044 8 => {
5045 let s = self.take(16)?;
5046 let arr: [u8; 16] = s.try_into().expect("checked");
5047 let scaled = i128::from_le_bytes(arr);
5048 let scale = self.read_u8()?;
5049 Ok(Value::Numeric { scaled, scale })
5050 }
5051 9 => Ok(Value::Date(self.read_i32()?)),
5052 10 => Ok(Value::Timestamp(self.read_i64()?)),
5053 11 => {
5058 let dim = self.read_u32()? as usize;
5059 let min = self.read_f32()?;
5060 let max = self.read_f32()?;
5061 let bytes = self.take(dim)?.to_vec();
5062 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5063 }
5064 12 => {
5067 let dim = self.read_u32()? as usize;
5068 let bytes = self.take(dim * 2)?.to_vec();
5069 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5070 }
5071 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5072 }
5073 }
5074
5075 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5079 let m_max_0 = self.read_u16()? as usize;
5080 let entry_raw = self.read_u32()?;
5081 let entry = if entry_raw == u32::MAX {
5082 None
5083 } else {
5084 Some(entry_raw as usize)
5085 };
5086 let entry_level = self.read_u8()?;
5087 let node_count = self.read_u32()? as usize;
5088 let mut levels: PersistentVec<u8> = PersistentVec::new();
5093 for _ in 0..node_count {
5094 levels.push_mut(self.read_u8()?);
5095 }
5096 let layer_count = self.read_u8()? as usize;
5097 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5098 for _ in 0..layer_count {
5099 let n = self.read_u32()? as usize;
5100 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5101 for _ in 0..n {
5102 let cnt = self.read_u16()? as usize;
5103 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5104 for _ in 0..cnt {
5105 row.push(self.read_u32()?);
5106 }
5107 per_layer.push_mut(row);
5108 }
5109 layers.push(per_layer);
5110 }
5111 Ok(NswGraph {
5112 m,
5113 m_max_0,
5114 entry,
5115 entry_level,
5116 levels,
5117 layers,
5118 })
5119 }
5120}
5121
5122#[cfg(test)]
5123mod tests {
5124 use super::*;
5125 use alloc::string::ToString;
5126 use alloc::vec;
5127
5128 #[cfg(target_arch = "aarch64")]
5129 #[test]
5130 fn neon_l2_matches_scalar() {
5131 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5136 for &d in &dims {
5137 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5138 let mut a = Vec::with_capacity(d);
5139 let mut b = Vec::with_capacity(d);
5140 for _ in 0..d {
5141 state = state
5142 .wrapping_mul(6_364_136_223_846_793_005)
5143 .wrapping_add(1);
5144 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5145 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5146 state = state
5147 .wrapping_mul(6_364_136_223_846_793_005)
5148 .wrapping_add(1);
5149 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5150 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5151 a.push(x);
5152 b.push(y);
5153 }
5154 let scalar = l2_distance_sq_scalar(&a, &b);
5155 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5156 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5157 assert!(
5158 (scalar - neon).abs() <= tol,
5159 "dim={d}: scalar={scalar} neon={neon} diff={}",
5160 (scalar - neon).abs()
5161 );
5162 }
5163 }
5164
5165 #[cfg(target_arch = "aarch64")]
5166 #[test]
5167 fn neon_inner_product_matches_scalar() {
5168 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5172 for &d in &dims {
5173 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5174 let mut a = Vec::with_capacity(d);
5175 let mut b = Vec::with_capacity(d);
5176 for _ in 0..d {
5177 state = state
5178 .wrapping_mul(6_364_136_223_846_793_005)
5179 .wrapping_add(1);
5180 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5181 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5182 state = state
5183 .wrapping_mul(6_364_136_223_846_793_005)
5184 .wrapping_add(1);
5185 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5186 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5187 a.push(x);
5188 b.push(y);
5189 }
5190 let scalar = inner_product_scalar(&a, &b);
5191 let neon = unsafe { inner_product_neon(&a, &b) };
5192 #[allow(clippy::cast_precision_loss)]
5193 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5194 assert!(
5195 (scalar - neon).abs() <= tol,
5196 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5197 (scalar - neon).abs()
5198 );
5199 }
5200 }
5201
5202 #[cfg(target_arch = "aarch64")]
5203 #[allow(clippy::similar_names)]
5204 #[test]
5205 fn neon_cosine_dot_norms_matches_scalar() {
5206 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5207 for &d in &dims {
5208 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5209 let mut a = Vec::with_capacity(d);
5210 let mut b = Vec::with_capacity(d);
5211 for _ in 0..d {
5212 state = state
5213 .wrapping_mul(6_364_136_223_846_793_005)
5214 .wrapping_add(1);
5215 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5216 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5217 state = state
5218 .wrapping_mul(6_364_136_223_846_793_005)
5219 .wrapping_add(1);
5220 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5221 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5222 a.push(x);
5223 b.push(y);
5224 }
5225 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5226 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5227 #[allow(clippy::cast_precision_loss)]
5228 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5229 #[allow(clippy::cast_precision_loss)]
5230 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5231 assert!(
5232 (dot_s - dot_n).abs() <= tol_d,
5233 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5234 );
5235 assert!(
5236 (na_s - na_n).abs() <= tol_n,
5237 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5238 );
5239 assert!(
5240 (nb_s - nb_n).abs() <= tol_n,
5241 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5242 );
5243 }
5244 }
5245
5246 fn make_users_schema() -> TableSchema {
5247 TableSchema::new(
5248 "users",
5249 vec![
5250 ColumnSchema::new("id", DataType::Int, false),
5251 ColumnSchema::new("name", DataType::Text, false),
5252 ColumnSchema::new("score", DataType::Float, true),
5253 ],
5254 )
5255 }
5256
5257 #[test]
5258 fn value_type_tag_matches_variant() {
5259 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5260 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5261 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5262 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5263 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5264 assert_eq!(Value::Null.data_type(), None);
5265 assert!(Value::Null.is_null());
5266 assert!(!Value::Int(0).is_null());
5267 }
5268
5269 #[test]
5270 fn sq8_value_reports_sq8_data_type() {
5271 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5276 let v = Value::Sq8Vector(q);
5277 assert_eq!(
5278 v.data_type(),
5279 Some(DataType::Vector {
5280 dim: 5,
5281 encoding: VecEncoding::Sq8,
5282 }),
5283 );
5284 }
5285
5286 #[test]
5287 fn datatype_display_matches_pg_keyword() {
5288 assert_eq!(DataType::Int.to_string(), "INT");
5289 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5290 assert_eq!(DataType::Float.to_string(), "FLOAT");
5291 assert_eq!(DataType::Text.to_string(), "TEXT");
5292 assert_eq!(DataType::Bool.to_string(), "BOOL");
5293 }
5294
5295 #[test]
5296 fn row_len_and_emptiness() {
5297 let r = Row::new(vec![Value::Int(1), Value::Null]);
5298 assert_eq!(r.len(), 2);
5299 assert!(!r.is_empty());
5300 assert!(Row::new(Vec::new()).is_empty());
5301 }
5302
5303 #[test]
5304 fn table_schema_column_position() {
5305 let s = make_users_schema();
5306 assert_eq!(s.column_position("id"), Some(0));
5307 assert_eq!(s.column_position("score"), Some(2));
5308 assert_eq!(s.column_position("missing"), None);
5309 }
5310
5311 #[test]
5312 fn catalog_create_table_then_lookup() {
5313 let mut cat = Catalog::new();
5314 cat.create_table(make_users_schema()).unwrap();
5315 assert_eq!(cat.table_count(), 1);
5316 assert!(cat.get("users").is_some());
5317 assert!(cat.get("nope").is_none());
5318 }
5319
5320 #[test]
5321 fn catalog_duplicate_table_is_rejected() {
5322 let mut cat = Catalog::new();
5323 cat.create_table(make_users_schema()).unwrap();
5324 let err = cat.create_table(make_users_schema()).unwrap_err();
5325 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5326 }
5327
5328 #[test]
5329 fn table_insert_happy_path_appends_row() {
5330 let mut cat = Catalog::new();
5331 cat.create_table(make_users_schema()).unwrap();
5332 let t = cat.get_mut("users").unwrap();
5333 t.insert(Row::new(vec![
5334 Value::Int(1),
5335 Value::Text("alice".into()),
5336 Value::Float(99.5),
5337 ]))
5338 .unwrap();
5339 assert_eq!(t.row_count(), 1);
5340 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5341 }
5342
5343 #[test]
5344 fn table_insert_arity_mismatch() {
5345 let mut cat = Catalog::new();
5346 cat.create_table(make_users_schema()).unwrap();
5347 let t = cat.get_mut("users").unwrap();
5348 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5349 assert!(matches!(
5350 err,
5351 StorageError::ArityMismatch {
5352 expected: 3,
5353 actual: 1
5354 }
5355 ));
5356 assert_eq!(t.row_count(), 0);
5357 }
5358
5359 #[test]
5360 fn table_insert_type_mismatch_reports_column() {
5361 let mut cat = Catalog::new();
5362 cat.create_table(make_users_schema()).unwrap();
5363 let t = cat.get_mut("users").unwrap();
5364 let err = t
5365 .insert(Row::new(vec![
5366 Value::Int(1),
5367 Value::Int(42), Value::Float(0.0),
5369 ]))
5370 .unwrap_err();
5371 match err {
5372 StorageError::TypeMismatch {
5373 ref column,
5374 expected,
5375 actual,
5376 position,
5377 } => {
5378 assert_eq!(column, "name");
5379 assert_eq!(expected, DataType::Text);
5380 assert_eq!(actual, DataType::Int);
5381 assert_eq!(position, 1);
5382 }
5383 other => panic!("unexpected: {other:?}"),
5384 }
5385 assert_eq!(t.row_count(), 0);
5386 }
5387
5388 #[test]
5389 fn table_insert_null_into_not_null_rejected() {
5390 let mut cat = Catalog::new();
5391 cat.create_table(make_users_schema()).unwrap();
5392 let t = cat.get_mut("users").unwrap();
5393 let err = t
5394 .insert(Row::new(vec![
5395 Value::Int(1),
5396 Value::Null, Value::Float(1.0),
5398 ]))
5399 .unwrap_err();
5400 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5401 }
5402
5403 #[test]
5404 fn table_insert_null_into_nullable_ok() {
5405 let mut cat = Catalog::new();
5406 cat.create_table(make_users_schema()).unwrap();
5407 let t = cat.get_mut("users").unwrap();
5408 t.insert(Row::new(vec![
5409 Value::Int(1),
5410 Value::Text("bob".into()),
5411 Value::Null,
5412 ]))
5413 .unwrap();
5414 assert_eq!(t.row_count(), 1);
5415 }
5416
5417 #[test]
5418 fn catalog_get_mut_independent_per_table() {
5419 let mut cat = Catalog::new();
5420 cat.create_table(TableSchema::new(
5421 "a",
5422 vec![ColumnSchema::new("v", DataType::Int, false)],
5423 ))
5424 .unwrap();
5425 cat.create_table(TableSchema::new(
5426 "b",
5427 vec![ColumnSchema::new("v", DataType::Int, false)],
5428 ))
5429 .unwrap();
5430 cat.get_mut("a")
5431 .unwrap()
5432 .insert(Row::new(vec![Value::Int(1)]))
5433 .unwrap();
5434 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5435 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5436 }
5437
5438 fn assert_round_trip(cat: &Catalog) {
5441 let bytes = cat.serialize();
5442 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5443 assert_eq!(restored.table_count(), cat.table_count());
5446 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5447 assert_eq!(a.schema, b.schema);
5448 assert_eq!(a.rows, b.rows);
5449 }
5450 }
5451
5452 #[test]
5453 fn serialize_empty_catalog_round_trips() {
5454 assert_round_trip(&Catalog::new());
5455 }
5456
5457 #[test]
5458 fn serialize_single_empty_table_round_trips() {
5459 let mut cat = Catalog::new();
5460 cat.create_table(make_users_schema()).unwrap();
5461 assert_round_trip(&cat);
5462 }
5463
5464 #[test]
5465 fn nsw_clone_is_o1() {
5466 let mut cat = Catalog::new();
5475 cat.create_table(TableSchema::new(
5476 "docs",
5477 alloc::vec![
5478 ColumnSchema::new("id", DataType::Int, false),
5479 ColumnSchema::new(
5480 "v",
5481 DataType::Vector {
5482 dim: 3,
5483 encoding: VecEncoding::F32
5484 },
5485 true
5486 ),
5487 ],
5488 ))
5489 .unwrap();
5490 let t = cat.get_mut("docs").unwrap();
5491 for i in 0..1500_i32 {
5492 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5494 t.insert(Row::new(alloc::vec![
5495 Value::Int(i),
5496 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5497 ]))
5498 .unwrap();
5499 }
5500 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5501 .unwrap();
5502 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5503 IndexKind::Nsw(g) => g,
5504 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5505 };
5506 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5509 assert!(
5510 g.layers.len() >= 2,
5511 "1500 nodes should populate at least two HNSW layers, got {}",
5512 g.layers.len()
5513 );
5514
5515 let cloned = g.clone();
5516
5517 assert!(
5518 g.levels.shares_storage_with(&cloned.levels),
5519 "levels PV not shared after clone — clone copied elements (O(N))"
5520 );
5521 assert_eq!(g.layers.len(), cloned.layers.len());
5522 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5523 assert!(
5524 orig.shares_storage_with(cl),
5525 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5526 );
5527 }
5528 }
5529
5530 #[test]
5531 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5532 let mut cat = Catalog::new();
5539 cat.create_table(TableSchema::new(
5540 "vecs",
5541 alloc::vec![
5542 ColumnSchema::new("id", DataType::Int, false),
5543 ColumnSchema::new(
5544 "v",
5545 DataType::Vector {
5546 dim: 8,
5547 encoding: VecEncoding::Sq8,
5548 },
5549 false,
5550 ),
5551 ],
5552 ))
5553 .unwrap();
5554 let t = cat.get_mut("vecs").unwrap();
5555 for i in 0..32_i32 {
5556 #[allow(clippy::cast_precision_loss)]
5557 let base = (i as f32) * 0.03;
5558 let v: Vec<f32> = (0..8_i32)
5559 .map(|j| {
5560 #[allow(clippy::cast_precision_loss)]
5561 let off = (j as f32) * 0.01;
5562 base + off
5563 })
5564 .collect();
5565 t.insert(Row::new(alloc::vec![
5566 Value::Int(i),
5567 Value::Sq8Vector(quantize::quantize(&v)),
5568 ]))
5569 .unwrap();
5570 }
5571 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5572 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5575 let (before_cell, before_ty, before_hits) = {
5576 let t_ref = cat.get("vecs").unwrap();
5577 (
5578 t_ref.rows()[5].values[1].clone(),
5579 t_ref.schema().columns[1].ty,
5580 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5581 )
5582 };
5583
5584 let bytes = cat.serialize();
5585 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5586 let rt = restored.get("vecs").unwrap();
5587 assert_eq!(rt.schema().columns[1].ty, before_ty);
5588 assert_eq!(rt.rows()[5].values[1], before_cell);
5589 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5590 assert_eq!(before_hits, after_hits);
5591 }
5592
5593 #[test]
5594 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5595 use crate::halfvec;
5602 let mut cat = Catalog::new();
5603 cat.create_table(TableSchema::new(
5604 "vecs",
5605 alloc::vec![
5606 ColumnSchema::new("id", DataType::Int, false),
5607 ColumnSchema::new(
5608 "v",
5609 DataType::Vector {
5610 dim: 8,
5611 encoding: VecEncoding::F16,
5612 },
5613 false,
5614 ),
5615 ],
5616 ))
5617 .unwrap();
5618 let t = cat.get_mut("vecs").unwrap();
5619 for i in 0..32_i32 {
5620 #[allow(clippy::cast_precision_loss)]
5621 let base = (i as f32) * 0.03;
5622 let v: Vec<f32> = (0..8_i32)
5623 .map(|j| {
5624 #[allow(clippy::cast_precision_loss)]
5625 let off = (j as f32) * 0.01;
5626 base + off
5627 })
5628 .collect();
5629 t.insert(Row::new(alloc::vec![
5630 Value::Int(i),
5631 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
5632 ]))
5633 .unwrap();
5634 }
5635 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5636 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5637 let (before_cell, before_ty, before_hits) = {
5638 let t_ref = cat.get("vecs").unwrap();
5639 (
5640 t_ref.rows()[5].values[1].clone(),
5641 t_ref.schema().columns[1].ty,
5642 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5643 )
5644 };
5645 let bytes = cat.serialize();
5646 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5647 let rt = restored.get("vecs").unwrap();
5648 assert_eq!(rt.schema().columns[1].ty, before_ty);
5649 assert_eq!(rt.rows()[5].values[1], before_cell);
5650 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5651 assert_eq!(before_hits, after_hits);
5652 }
5653
5654 #[test]
5655 #[allow(clippy::similar_names)]
5656 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
5657 use crate::halfvec;
5664 fn next(state: &mut u64) -> f32 {
5665 *state = state
5666 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5667 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5668 #[allow(clippy::cast_precision_loss)]
5669 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5670 2.0 * u - 1.0
5671 }
5672 let dim: u32 = 32;
5673 let n: usize = 512;
5674 let dim_us = dim as usize;
5675 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
5676 let corpus: Vec<Vec<f32>> = (0..n)
5677 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5678 .collect();
5679 let queries: Vec<Vec<f32>> = (0..32)
5680 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5681 .collect();
5682 let exact_top10: Vec<Vec<usize>> = queries
5683 .iter()
5684 .map(|q| {
5685 let mut scored: Vec<(f32, usize)> = corpus
5686 .iter()
5687 .enumerate()
5688 .map(|(i, v)| (l2_distance_sq(v, q), i))
5689 .collect();
5690 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5691 scored.into_iter().take(10).map(|(_, i)| i).collect()
5692 })
5693 .collect();
5694 let mut cat = Catalog::new();
5695 cat.create_table(TableSchema::new(
5696 "vecs",
5697 alloc::vec![
5698 ColumnSchema::new("id", DataType::Int, false),
5699 ColumnSchema::new(
5700 "v",
5701 DataType::Vector {
5702 dim,
5703 encoding: VecEncoding::F16,
5704 },
5705 false,
5706 ),
5707 ],
5708 ))
5709 .unwrap();
5710 let t = cat.get_mut("vecs").unwrap();
5711 for (i, v) in corpus.iter().enumerate() {
5712 t.insert(Row::new(alloc::vec![
5713 Value::Int(i32::try_from(i).unwrap()),
5714 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
5715 ]))
5716 .unwrap();
5717 }
5718 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5719 let table = cat.get("vecs").unwrap();
5720 let mut total_overlap = 0_usize;
5721 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5722 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5723 for h in &hits {
5724 if exact.contains(h) {
5725 total_overlap += 1;
5726 }
5727 }
5728 }
5729 #[allow(clippy::cast_precision_loss)]
5730 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5731 assert!(
5732 recall >= 0.95,
5733 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5734 check halfvec dispatch in `cell_to_query_metric_distance`"
5735 );
5736 }
5737
5738 #[test]
5739 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
5740 use crate::quantize;
5747 fn next(state: &mut u64) -> f32 {
5751 *state = state
5752 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5753 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5754 #[allow(clippy::cast_precision_loss)]
5755 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5756 2.0 * u - 1.0
5757 }
5758 let dim: u32 = 32;
5759 let n: usize = 512;
5760 let dim_us = dim as usize;
5761 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
5762 let corpus: Vec<Vec<f32>> = (0..n)
5763 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5764 .collect();
5765 let queries: Vec<Vec<f32>> = (0..32)
5766 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5767 .collect();
5768 let exact_top10: Vec<Vec<usize>> = queries
5770 .iter()
5771 .map(|q| {
5772 let mut scored: Vec<(f32, usize)> = corpus
5773 .iter()
5774 .enumerate()
5775 .map(|(i, v)| (l2_distance_sq(v, q), i))
5776 .collect();
5777 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5778 scored.into_iter().take(10).map(|(_, i)| i).collect()
5779 })
5780 .collect();
5781 let mut cat = Catalog::new();
5784 cat.create_table(TableSchema::new(
5785 "vecs",
5786 alloc::vec![
5787 ColumnSchema::new("id", DataType::Int, false),
5788 ColumnSchema::new(
5789 "v",
5790 DataType::Vector {
5791 dim,
5792 encoding: VecEncoding::Sq8,
5793 },
5794 false,
5795 ),
5796 ],
5797 ))
5798 .unwrap();
5799 let t = cat.get_mut("vecs").unwrap();
5800 for (i, v) in corpus.iter().enumerate() {
5801 t.insert(Row::new(alloc::vec![
5802 Value::Int(i32::try_from(i).unwrap()),
5803 Value::Sq8Vector(quantize::quantize(v)),
5804 ]))
5805 .unwrap();
5806 }
5807 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5808 let table = cat.get("vecs").unwrap();
5809 let mut total_overlap = 0_usize;
5810 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5811 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5812 for h in &hits {
5813 if exact.contains(h) {
5814 total_overlap += 1;
5815 }
5816 }
5817 }
5818 #[allow(clippy::cast_precision_loss)]
5819 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5820 assert!(
5821 recall >= 0.95,
5822 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5823 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
5824 );
5825 }
5826
5827 #[test]
5828 fn nsw_index_topology_persists_through_round_trip() {
5829 let mut cat = Catalog::new();
5835 cat.create_table(TableSchema::new(
5836 "docs",
5837 alloc::vec![
5838 ColumnSchema::new("id", DataType::Int, false),
5839 ColumnSchema::new(
5840 "v",
5841 DataType::Vector {
5842 dim: 3,
5843 encoding: VecEncoding::F32
5844 },
5845 true
5846 ),
5847 ],
5848 ))
5849 .unwrap();
5850 let t = cat.get_mut("docs").unwrap();
5851 for i in 0..6_i32 {
5852 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
5854 let row = Row::new(alloc::vec![
5855 Value::Int(i),
5856 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5857 ]);
5858 t.insert(row).unwrap();
5859 }
5860 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5861 .unwrap();
5862 let original = match &cat.get("docs").unwrap().indices()[0].kind {
5863 IndexKind::Nsw(g) => g.clone(),
5864 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5865 };
5866 let bytes = cat.serialize();
5867 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5868 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
5869 IndexKind::Nsw(g) => g.clone(),
5870 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5871 };
5872 assert_eq!(restored_graph.m, original.m);
5873 assert_eq!(restored_graph.m_max_0, original.m_max_0);
5874 assert_eq!(restored_graph.entry, original.entry);
5875 assert_eq!(restored_graph.entry_level, original.entry_level);
5876 assert_eq!(restored_graph.levels, original.levels);
5877 assert_eq!(restored_graph.layers, original.layers);
5878 }
5879
5880 #[test]
5881 fn hnsw_level_assignment_is_deterministic() {
5882 for i in 0..32usize {
5885 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
5886 }
5887 }
5888
5889 #[test]
5890 fn hnsw_layer_0_dominates_population() {
5891 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
5896 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
5897 }
5898
5899 #[test]
5900 fn hnsw_search_matches_brute_force_for_l2_top1() {
5901 let mut cat = Catalog::new();
5905 cat.create_table(TableSchema::new(
5906 "vecs",
5907 alloc::vec![
5908 ColumnSchema::new("id", DataType::Int, false),
5909 ColumnSchema::new(
5910 "v",
5911 DataType::Vector {
5912 dim: 3,
5913 encoding: VecEncoding::F32
5914 },
5915 true
5916 ),
5917 ],
5918 ))
5919 .unwrap();
5920 let t = cat.get_mut("vecs").unwrap();
5921 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
5922 (1, [0.0, 0.0, 0.0]),
5923 (2, [1.0, 0.0, 0.0]),
5924 (3, [0.0, 1.0, 0.0]),
5925 (4, [0.0, 0.0, 1.0]),
5926 (5, [1.0, 1.0, 0.0]),
5927 (6, [1.0, 0.0, 1.0]),
5928 (7, [0.0, 1.0, 1.0]),
5929 (8, [1.0, 1.0, 1.0]),
5930 (9, [0.5, 0.5, 0.5]),
5931 (10, [0.2, 0.8, 0.5]),
5932 ];
5933 for &(id, v) in &dataset {
5934 t.insert(Row::new(alloc::vec![
5935 Value::Int(id),
5936 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
5937 ]))
5938 .unwrap();
5939 }
5940 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5941 let idx_pos = cat
5942 .get("vecs")
5943 .unwrap()
5944 .indices()
5945 .iter()
5946 .position(|i| i.name == "v_idx")
5947 .unwrap();
5948 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
5949 let table = cat.get("vecs").unwrap();
5950 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
5951 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
5952 .map(|i| {
5953 let Value::Vector(v) = &table.rows[i].values[1] else {
5954 return (f32::INFINITY, i);
5955 };
5956 (l2_distance_sq(v, &query), i)
5957 })
5958 .collect();
5959 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5960 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
5961 assert_eq!(
5962 hnsw_top[0].1, brute[0].1,
5963 "HNSW top-1 != brute-force top-1 for {query:?}"
5964 );
5965 }
5966 }
5967
5968 #[test]
5969 fn serialize_table_with_rows_round_trips() {
5970 let mut cat = Catalog::new();
5971 cat.create_table(make_users_schema()).unwrap();
5972 let t = cat.get_mut("users").unwrap();
5973 t.insert(Row::new(vec![
5974 Value::Int(1),
5975 Value::Text("alice".into()),
5976 Value::Float(95.5),
5977 ]))
5978 .unwrap();
5979 t.insert(Row::new(vec![
5980 Value::Int(2),
5981 Value::Text("bob".into()),
5982 Value::Null,
5983 ]))
5984 .unwrap();
5985 assert_round_trip(&cat);
5986 }
5987
5988 #[test]
5989 fn serialize_multiple_tables_round_trips() {
5990 let mut cat = Catalog::new();
5991 cat.create_table(make_users_schema()).unwrap();
5992 cat.create_table(TableSchema::new(
5993 "flags",
5994 vec![
5995 ColumnSchema::new("id", DataType::BigInt, false),
5996 ColumnSchema::new("active", DataType::Bool, false),
5997 ],
5998 ))
5999 .unwrap();
6000 cat.get_mut("flags")
6001 .unwrap()
6002 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6003 .unwrap();
6004 assert_round_trip(&cat);
6005 }
6006
6007 #[test]
6008 fn deserialize_rejects_bad_magic() {
6009 let mut buf = b"BADMAGIC".to_vec();
6010 buf.push(FILE_VERSION);
6011 buf.extend_from_slice(&0u32.to_le_bytes());
6012 let err = Catalog::deserialize(&buf).unwrap_err();
6013 assert!(matches!(err, StorageError::Corrupt(_)));
6014 }
6015
6016 #[test]
6017 fn deserialize_rejects_unsupported_version() {
6018 let mut buf = FILE_MAGIC.to_vec();
6019 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6021 let err = Catalog::deserialize(&buf).unwrap_err();
6022 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6023 }
6024
6025 #[test]
6026 fn deserialize_rejects_truncated_file() {
6027 let mut cat = Catalog::new();
6028 cat.create_table(make_users_schema()).unwrap();
6029 let bytes = cat.serialize();
6030 let truncated = &bytes[..bytes.len() - 1];
6032 assert!(matches!(
6033 Catalog::deserialize(truncated),
6034 Err(StorageError::Corrupt(_))
6035 ));
6036 }
6037
6038 #[test]
6039 fn deserialize_rejects_trailing_garbage() {
6040 let cat = Catalog::new();
6041 let mut bytes = cat.serialize();
6042 bytes.push(0xFF);
6043 assert!(matches!(
6044 Catalog::deserialize(&bytes),
6045 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6046 ));
6047 }
6048
6049 fn populated_users() -> Catalog {
6052 let mut cat = Catalog::new();
6053 cat.create_table(make_users_schema()).unwrap();
6054 let t = cat.get_mut("users").unwrap();
6055 for (id, name, score) in [
6056 (1, "alice", Some(90.0)),
6057 (2, "bob", None),
6058 (3, "alice", Some(70.0)), ] {
6060 t.insert(Row::new(vec![
6061 Value::Int(id),
6062 Value::Text(name.into()),
6063 score.map_or(Value::Null, Value::Float),
6064 ]))
6065 .unwrap();
6066 }
6067 cat
6068 }
6069
6070 #[test]
6071 fn add_index_builds_from_existing_rows() {
6072 let mut cat = populated_users();
6073 cat.get_mut("users")
6074 .unwrap()
6075 .add_index("by_id".into(), "id")
6076 .unwrap();
6077 let t = cat.get("users").unwrap();
6078 let idx = t.index_on(0).expect("index_on(0)");
6079 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6080 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6081 }
6082
6083 #[test]
6084 fn add_index_dup_name_rejected() {
6085 let mut cat = populated_users();
6086 let t = cat.get_mut("users").unwrap();
6087 t.add_index("ix".into(), "id").unwrap();
6088 let err = t.add_index("ix".into(), "name").unwrap_err();
6089 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6090 }
6091
6092 #[test]
6093 fn add_index_unknown_column_rejected() {
6094 let mut cat = populated_users();
6095 let err = cat
6096 .get_mut("users")
6097 .unwrap()
6098 .add_index("ix".into(), "ghost")
6099 .unwrap_err();
6100 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6101 }
6102
6103 #[test]
6104 fn insert_after_create_index_updates_it() {
6105 let mut cat = populated_users();
6106 let t = cat.get_mut("users").unwrap();
6107 t.add_index("by_name".into(), "name").unwrap();
6108 t.insert(Row::new(vec![
6109 Value::Int(4),
6110 Value::Text("dave".into()),
6111 Value::Null,
6112 ]))
6113 .unwrap();
6114 let idx = t.index_on(1).unwrap();
6115 assert_eq!(
6116 idx.lookup_eq(&IndexKey::Text("dave".into())),
6117 &[RowLocator::Hot(3)]
6118 );
6119 assert_eq!(
6121 idx.lookup_eq(&IndexKey::Text("alice".into())),
6122 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6123 );
6124 }
6125
6126 #[test]
6127 fn null_or_float_values_are_not_indexed() {
6128 let mut cat = populated_users();
6129 let t = cat.get_mut("users").unwrap();
6130 t.add_index("by_score".into(), "score").unwrap();
6131 let idx = t.index_on(2).unwrap();
6132 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6137 }
6138
6139 #[test]
6142 fn vector_value_data_type_carries_dim() {
6143 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6144 assert_eq!(
6145 v.data_type(),
6146 Some(DataType::Vector {
6147 dim: 3,
6148 encoding: VecEncoding::F32
6149 })
6150 );
6151 }
6152
6153 #[test]
6154 fn vector_column_insert_matching_dim_ok() {
6155 let mut cat = Catalog::new();
6156 cat.create_table(TableSchema::new(
6157 "emb",
6158 vec![ColumnSchema::new(
6159 "v",
6160 DataType::Vector {
6161 dim: 3,
6162 encoding: VecEncoding::F32,
6163 },
6164 false,
6165 )],
6166 ))
6167 .unwrap();
6168 cat.get_mut("emb")
6169 .unwrap()
6170 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6171 .unwrap();
6172 }
6173
6174 #[test]
6175 fn vector_column_insert_dim_mismatch_rejected() {
6176 let mut cat = Catalog::new();
6177 cat.create_table(TableSchema::new(
6178 "emb",
6179 vec![ColumnSchema::new(
6180 "v",
6181 DataType::Vector {
6182 dim: 3,
6183 encoding: VecEncoding::F32,
6184 },
6185 false,
6186 )],
6187 ))
6188 .unwrap();
6189 let err = cat
6190 .get_mut("emb")
6191 .unwrap()
6192 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6193 .unwrap_err();
6194 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6195 }
6196
6197 #[test]
6198 fn vector_value_survives_catalog_round_trip() {
6199 let mut cat = Catalog::new();
6200 cat.create_table(TableSchema::new(
6201 "emb",
6202 vec![
6203 ColumnSchema::new("id", DataType::Int, false),
6204 ColumnSchema::new(
6205 "v",
6206 DataType::Vector {
6207 dim: 4,
6208 encoding: VecEncoding::F32,
6209 },
6210 false,
6211 ),
6212 ],
6213 ))
6214 .unwrap();
6215 cat.get_mut("emb")
6216 .unwrap()
6217 .insert(Row::new(vec![
6218 Value::Int(1),
6219 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6220 ]))
6221 .unwrap();
6222 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6223 let table = restored.get("emb").unwrap();
6224 assert_eq!(
6225 table.schema().columns[1].ty,
6226 DataType::Vector {
6227 dim: 4,
6228 encoding: VecEncoding::F32
6229 }
6230 );
6231 assert_eq!(
6232 table.rows()[0].values[1],
6233 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6234 );
6235 }
6236
6237 #[test]
6238 fn index_survives_serialize_deserialize_round_trip() {
6239 let mut cat = populated_users();
6240 cat.get_mut("users")
6241 .unwrap()
6242 .add_index("by_name".into(), "name")
6243 .unwrap();
6244 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6245 let idx = restored
6246 .get("users")
6247 .unwrap()
6248 .index_on(1)
6249 .expect("index_on(1) after restore");
6250 assert_eq!(idx.name, "by_name");
6251 assert_eq!(
6253 idx.lookup_eq(&IndexKey::Text("alice".into())),
6254 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6255 );
6256 }
6257
6258 fn bigint_pk_users_schema() -> TableSchema {
6263 TableSchema::new(
6264 "users",
6265 vec![
6266 ColumnSchema::new("id", DataType::BigInt, false),
6267 ColumnSchema::new("name", DataType::Text, false),
6268 ],
6269 )
6270 }
6271
6272 fn make_user_row(id: i64, name: &str) -> Row {
6273 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6274 }
6275
6276 #[test]
6277 fn lookup_by_pk_finds_row_via_hot_index() {
6278 let mut cat = Catalog::new();
6279 cat.create_table(bigint_pk_users_schema()).unwrap();
6280 let t = cat.get_mut("users").unwrap();
6281 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6282 t.insert(make_user_row(id, name)).unwrap();
6283 }
6284 t.add_index("by_id".into(), "id").unwrap();
6285 let got = cat
6287 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6288 .unwrap();
6289 assert_eq!(got, make_user_row(2, "bob"));
6290 assert_eq!(cat.cold_segment_count(), 0);
6291 }
6292
6293 #[test]
6294 fn lookup_by_pk_returns_none_when_key_missing() {
6295 let mut cat = Catalog::new();
6296 cat.create_table(bigint_pk_users_schema()).unwrap();
6297 let t = cat.get_mut("users").unwrap();
6298 t.insert(make_user_row(1, "alice")).unwrap();
6299 t.add_index("by_id".into(), "id").unwrap();
6300 assert!(
6301 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6302 .is_none()
6303 );
6304 assert!(
6306 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6307 .is_none()
6308 );
6309 assert!(
6310 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6311 .is_none()
6312 );
6313 }
6314
6315 #[test]
6316 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6317 let mut cat = Catalog::new();
6321 cat.create_table(bigint_pk_users_schema()).unwrap();
6322 let t = cat.get_mut("users").unwrap();
6323 t.add_index("by_id".into(), "id").unwrap();
6324 let schema = t.schema.clone();
6325
6326 let cold_rows: Vec<(i64, &str)> =
6327 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6328 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6329 .iter()
6330 .map(|(id, name)| {
6331 let row = make_user_row(*id, name);
6332 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6333 })
6334 .collect();
6335 let (seg_bytes, _meta) =
6336 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6337 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6338 assert_eq!(seg_id, 0);
6339 assert_eq!(cat.cold_segment_count(), 1);
6340
6341 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6342 .iter()
6343 .map(|(id, _)| {
6344 (
6345 IndexKey::Int(*id),
6346 RowLocator::Cold {
6347 segment_id: seg_id,
6348 page_offset: 0,
6349 },
6350 )
6351 })
6352 .collect();
6353 let registered = cat
6354 .get_mut("users")
6355 .unwrap()
6356 .register_cold_locators("by_id", pairs)
6357 .unwrap();
6358 assert_eq!(registered, 4);
6359
6360 for (id, name) in &cold_rows {
6361 let got = cat
6362 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6363 .unwrap_or_else(|| panic!("cold key {id} not found"));
6364 assert_eq!(got, make_user_row(*id, name));
6365 }
6366 assert!(
6368 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6369 .is_none()
6370 );
6371 }
6372
6373 #[test]
6374 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6375 let mut cat = Catalog::new();
6379 cat.create_table(bigint_pk_users_schema()).unwrap();
6380 let t = cat.get_mut("users").unwrap();
6381 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6382 t.insert(make_user_row(id, name)).unwrap();
6383 }
6384 t.add_index("by_id".into(), "id").unwrap();
6385 let schema = t.schema.clone();
6386
6387 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6388 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6389 .iter()
6390 .map(|(id, name)| {
6391 let row = make_user_row(*id, name);
6392 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6393 })
6394 .collect();
6395 let (seg_bytes, _) =
6396 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6397 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6398 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6399 .iter()
6400 .map(|(id, _)| {
6401 (
6402 IndexKey::Int(*id),
6403 RowLocator::Cold {
6404 segment_id: seg_id,
6405 page_offset: 0,
6406 },
6407 )
6408 })
6409 .collect();
6410 cat.get_mut("users")
6411 .unwrap()
6412 .register_cold_locators("by_id", pairs)
6413 .unwrap();
6414
6415 assert_eq!(
6417 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6418 .unwrap(),
6419 make_user_row(1, "alice")
6420 );
6421 assert_eq!(
6422 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6423 .unwrap(),
6424 make_user_row(2, "bob")
6425 );
6426 assert_eq!(
6428 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6429 .unwrap(),
6430 make_user_row(100, "ivy")
6431 );
6432 assert_eq!(
6433 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6434 .unwrap(),
6435 make_user_row(200, "joe")
6436 );
6437 assert!(
6439 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6440 .is_none()
6441 );
6442 }
6443
6444 #[test]
6445 fn register_cold_locators_rejects_nsw_index() {
6446 let mut cat = Catalog::new();
6447 cat.create_table(TableSchema::new(
6448 "vecs",
6449 vec![
6450 ColumnSchema::new("id", DataType::Int, false),
6451 ColumnSchema::new(
6452 "v",
6453 DataType::Vector {
6454 dim: 4,
6455 encoding: VecEncoding::F32,
6456 },
6457 false,
6458 ),
6459 ],
6460 ))
6461 .unwrap();
6462 let t = cat.get_mut("vecs").unwrap();
6463 t.insert(Row::new(vec![
6464 Value::Int(1),
6465 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6466 ]))
6467 .unwrap();
6468 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6469 let err = t
6470 .register_cold_locators(
6471 "by_v",
6472 vec![(
6473 IndexKey::Int(1),
6474 RowLocator::Cold {
6475 segment_id: 0,
6476 page_offset: 0,
6477 },
6478 )],
6479 )
6480 .unwrap_err();
6481 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6484 }
6485
6486 #[test]
6487 fn load_segment_bytes_rejects_garbage() {
6488 let mut cat = Catalog::new();
6489 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6490 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6491 assert_eq!(cat.cold_segment_count(), 0);
6493 }
6494
6495 #[test]
6496 fn load_segment_bytes_returns_sequential_ids() {
6497 let mut cat = Catalog::new();
6498 cat.create_table(bigint_pk_users_schema()).unwrap();
6499 let schema = cat.get("users").unwrap().schema.clone();
6500 for batch in 0u32..3 {
6501 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6502 .map(|i| {
6503 let id = u64::from(batch) * 100 + i;
6504 let row = make_user_row(id.cast_signed(), "x");
6505 (id, encode_row_body_dense(&row, &schema))
6506 })
6507 .collect();
6508 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6509 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6510 }
6511 assert_eq!(cat.cold_segment_count(), 3);
6512 }
6513
6514 #[test]
6521 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6522 let mut cat = populated_users();
6529 cat.get_mut("users")
6530 .unwrap()
6531 .add_index("by_name".into(), "name")
6532 .unwrap();
6533
6534 let v8_bytes = encode_as_v8(&cat);
6539 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6540
6541 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6542 let idx = restored
6543 .get("users")
6544 .unwrap()
6545 .index_on(1)
6546 .expect("index_on(1) after restore");
6547 assert_eq!(
6550 idx.lookup_eq(&IndexKey::Text("alice".into())),
6551 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6552 );
6553 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6555 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6556 }
6557 }
6558
6559 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6564 let mut out = Vec::with_capacity(64);
6565 out.extend_from_slice(FILE_MAGIC);
6566 out.push(8u8);
6567 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6568 for t in &cat.tables {
6569 write_str(&mut out, &t.schema.name);
6570 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6571 for c in &t.schema.columns {
6572 write_str(&mut out, &c.name);
6573 write_data_type(&mut out, c.ty);
6574 out.push(u8::from(c.nullable));
6575 match &c.default {
6576 None => out.push(0),
6577 Some(v) => {
6578 out.push(1);
6579 write_value(&mut out, v);
6580 }
6581 }
6582 out.push(u8::from(c.auto_increment));
6583 }
6584 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6585 for row in &t.rows {
6586 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6587 }
6588 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6589 for idx in &t.indices {
6590 write_str(&mut out, &idx.name);
6591 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6592 match &idx.kind {
6593 IndexKind::BTree(_) => out.push(0),
6596 IndexKind::Nsw(g) => {
6597 out.push(1);
6598 write_u16(&mut out, u16::try_from(g.m).unwrap());
6599 write_nsw_graph(&mut out, g);
6600 }
6601 IndexKind::Brin { .. } => panic!(
6604 "v8 catalog writer cannot serialise BRIN — \
6605 tests with BRIN indices must use the current writer"
6606 ),
6607 }
6608 }
6609 }
6610 out
6611 }
6612
6613 #[test]
6619 fn v9_catalog_round_trip_preserves_cold_locators() {
6620 let mut cat = Catalog::new();
6621 cat.create_table(bigint_pk_users_schema()).unwrap();
6622 let t = cat.get_mut("users").unwrap();
6623 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6625 t.insert(make_user_row(id, name)).unwrap();
6626 }
6627 t.add_index("by_id".into(), "id").unwrap();
6628 let schema = t.schema.clone();
6629
6630 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
6632 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6633 .iter()
6634 .map(|(id, name)| {
6635 let row = make_user_row(*id, name);
6636 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6637 })
6638 .collect();
6639 let (seg_bytes, _) =
6640 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6641 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
6642 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6643 .iter()
6644 .map(|(id, _)| {
6645 (
6646 IndexKey::Int(*id),
6647 RowLocator::Cold {
6648 segment_id: seg_id,
6649 page_offset: 0,
6650 },
6651 )
6652 })
6653 .collect();
6654 cat.get_mut("users")
6655 .unwrap()
6656 .register_cold_locators("by_id", pairs)
6657 .unwrap();
6658
6659 let bytes = cat.serialize();
6661 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
6662 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
6663
6664 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
6671 assert_eq!(restored_seg_id, seg_id);
6672
6673 let idx = restored.get("users").unwrap().index_on(0).unwrap();
6674 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
6676 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6677 for (id, _) in &cold_rows {
6679 assert_eq!(
6680 idx.lookup_eq(&IndexKey::Int(*id)),
6681 &[RowLocator::Cold {
6682 segment_id: seg_id,
6683 page_offset: 0,
6684 }]
6685 );
6686 }
6687 assert_eq!(
6689 restored
6690 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6691 .unwrap(),
6692 make_user_row(2, "bob")
6693 );
6694 for (id, name) in &cold_rows {
6695 assert_eq!(
6696 restored
6697 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6698 .unwrap(),
6699 make_user_row(*id, name)
6700 );
6701 }
6702 }
6703
6704 #[test]
6711 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
6712 let schema = TableSchema::new(
6713 "wide",
6714 vec![
6715 ColumnSchema::new("a", DataType::SmallInt, true),
6716 ColumnSchema::new("b", DataType::Int, false),
6717 ColumnSchema::new("c", DataType::BigInt, false),
6718 ColumnSchema::new("d", DataType::Float, false),
6719 ColumnSchema::new("e", DataType::Bool, false),
6720 ColumnSchema::new("f", DataType::Text, false),
6721 ColumnSchema::new(
6722 "g",
6723 DataType::Vector {
6724 dim: 3,
6725 encoding: VecEncoding::F32,
6726 },
6727 false,
6728 ),
6729 ColumnSchema::new(
6730 "h",
6731 DataType::Numeric {
6732 precision: 18,
6733 scale: 2,
6734 },
6735 false,
6736 ),
6737 ColumnSchema::new("i", DataType::Date, false),
6738 ColumnSchema::new("j", DataType::Timestamp, false),
6739 ],
6740 );
6741 let cases: &[Row] = &[
6742 Row::new(vec![
6743 Value::SmallInt(7),
6744 Value::Int(42),
6745 Value::BigInt(1_000_000),
6746 Value::Float(1.5),
6747 Value::Bool(true),
6748 Value::Text("hello".into()),
6749 Value::Vector(vec![1.0, 2.0, 3.0]),
6750 Value::Numeric {
6751 scaled: 12345,
6752 scale: 2,
6753 },
6754 Value::Date(20_000),
6755 Value::Timestamp(1_700_000_000_000_000),
6756 ]),
6757 Row::new(vec![
6759 Value::Null,
6760 Value::Int(0),
6761 Value::BigInt(0),
6762 Value::Float(0.0),
6763 Value::Bool(false),
6764 Value::Text(String::new()),
6765 Value::Vector(vec![]),
6766 Value::Numeric {
6767 scaled: 0,
6768 scale: 2,
6769 },
6770 Value::Date(0),
6771 Value::Timestamp(0),
6772 ]),
6773 Row::new(vec![
6774 Value::SmallInt(-1),
6775 Value::Int(-1),
6776 Value::BigInt(-1),
6777 Value::Float(-0.5),
6778 Value::Bool(true),
6779 Value::Text("a much longer payload here".into()),
6780 Value::Vector(vec![0.1, 0.2, 0.3]),
6781 Value::Numeric {
6782 scaled: -999_999_999,
6783 scale: 2,
6784 },
6785 Value::Date(-1),
6786 Value::Timestamp(-1),
6787 ]),
6788 ];
6789 for row in cases {
6790 let actual = encode_row_body_dense(row, &schema).len();
6791 let fast = row_body_encoded_len(row, &schema);
6792 assert_eq!(actual, fast, "row {row:?}");
6793 }
6794 }
6795
6796 #[test]
6797 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
6798 let mut cat = Catalog::new();
6799 cat.create_table(bigint_pk_users_schema()).unwrap();
6800 let t = cat.get_mut("users").unwrap();
6801 assert_eq!(t.hot_bytes(), 0);
6802 let mut expected: u64 = 0;
6803 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6804 let row = make_user_row(id, name);
6805 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
6806 t.insert(row).unwrap();
6807 }
6808 assert_eq!(t.hot_bytes(), expected);
6809 assert_eq!(cat.hot_tier_bytes(), expected);
6810 }
6811
6812 #[test]
6813 fn hot_bytes_shrinks_on_delete() {
6814 let mut cat = Catalog::new();
6815 cat.create_table(bigint_pk_users_schema()).unwrap();
6816 let t = cat.get_mut("users").unwrap();
6817 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6818 t.insert(make_user_row(id, name)).unwrap();
6819 }
6820 let before = t.hot_bytes();
6821 let bob_row = make_user_row(2, "bob");
6823 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
6824 let removed = t.delete_rows(&[1]);
6825 assert_eq!(removed, 1);
6826 assert_eq!(t.hot_bytes(), before - bob_bytes);
6827 }
6828
6829 #[test]
6830 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
6831 let mut cat = Catalog::new();
6832 cat.create_table(bigint_pk_users_schema()).unwrap();
6833 let t = cat.get_mut("users").unwrap();
6834 t.insert(make_user_row(1, "alice")).unwrap();
6835 let after_insert = t.hot_bytes();
6836 let new_row = make_user_row(1, "alice-the-longer-name");
6839 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
6840 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
6841 t.update_row(0, new_row.values).unwrap();
6842 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
6843 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
6844 }
6845
6846 #[test]
6847 fn hot_bytes_round_trips_through_serialize_deserialize() {
6848 let mut cat = Catalog::new();
6849 cat.create_table(bigint_pk_users_schema()).unwrap();
6850 let t = cat.get_mut("users").unwrap();
6851 for i in 0..10 {
6852 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
6853 .unwrap();
6854 }
6855 let pre = cat.hot_tier_bytes();
6856 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6857 assert_eq!(restored.hot_tier_bytes(), pre);
6858 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
6859 }
6860
6861 #[test]
6868 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
6869 let mut cat = Catalog::new();
6870 cat.create_table(bigint_pk_users_schema()).unwrap();
6871 let t = cat.get_mut("users").unwrap();
6872 for id in 0..10i64 {
6873 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6874 .unwrap();
6875 }
6876 t.add_index("by_id".into(), "id").unwrap();
6877 let total_bytes_before = t.hot_bytes();
6878
6879 let report = cat
6880 .freeze_oldest_to_cold("users", "by_id", 6)
6881 .expect("freeze succeeds");
6882 assert_eq!(report.frozen_rows, 6);
6883 assert_eq!(report.segment_id, 0);
6884 assert!(report.bytes_freed > 0);
6885 assert!(!report.segment_bytes.is_empty());
6886
6887 let t = cat.get("users").unwrap();
6888 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
6889 assert_eq!(cat.cold_segment_count(), 1);
6890 assert_eq!(
6892 t.hot_bytes(),
6893 total_bytes_before - report.bytes_freed,
6894 "hot_bytes accounting matches FreezeReport"
6895 );
6896
6897 for id in 0..10i64 {
6900 let got = cat
6901 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
6902 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
6903 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
6904 }
6905 }
6906
6907 #[test]
6912 fn freeze_twice_preserves_prior_cold_locators() {
6913 let mut cat = Catalog::new();
6914 cat.create_table(bigint_pk_users_schema()).unwrap();
6915 let t = cat.get_mut("users").unwrap();
6916 for id in 0..12i64 {
6917 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6918 .unwrap();
6919 }
6920 t.add_index("by_id".into(), "id").unwrap();
6921
6922 cat.freeze_oldest_to_cold("users", "by_id", 4)
6923 .expect("first freeze ok");
6924 cat.freeze_oldest_to_cold("users", "by_id", 4)
6925 .expect("second freeze ok");
6926
6927 assert_eq!(cat.get("users").unwrap().row_count(), 4);
6928 assert_eq!(cat.cold_segment_count(), 2);
6929 for id in 0..12i64 {
6932 let got = cat
6933 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
6934 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
6935 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
6936 }
6937 }
6938
6939 #[test]
6942 fn freeze_oldest_to_cold_rejects_invalid_input() {
6943 let mut cat = Catalog::new();
6944 cat.create_table(bigint_pk_users_schema()).unwrap();
6945 let t = cat.get_mut("users").unwrap();
6946 for id in 0..3i64 {
6947 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
6948 .unwrap();
6949 }
6950 t.add_index("by_id".into(), "id").unwrap();
6951
6952 assert!(matches!(
6954 cat.freeze_oldest_to_cold("users", "by_id", 0),
6955 Err(StorageError::Corrupt(_))
6956 ));
6957 assert!(matches!(
6959 cat.freeze_oldest_to_cold("missing", "by_id", 1),
6960 Err(StorageError::Corrupt(_))
6961 ));
6962 assert!(matches!(
6964 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
6965 Err(StorageError::Corrupt(_))
6966 ));
6967 assert!(matches!(
6969 cat.freeze_oldest_to_cold("users", "by_id", 999),
6970 Err(StorageError::Corrupt(_))
6971 ));
6972 assert_eq!(cat.get("users").unwrap().row_count(), 3);
6974 assert_eq!(cat.cold_segment_count(), 0);
6975 }
6976
6977 #[test]
6980 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
6981 let mut cat = Catalog::new();
6982 cat.create_table(TableSchema::new(
6983 "by_name",
6984 vec![
6985 ColumnSchema::new("name", DataType::Text, false),
6986 ColumnSchema::new("payload", DataType::BigInt, false),
6987 ],
6988 ))
6989 .unwrap();
6990 let t = cat.get_mut("by_name").unwrap();
6991 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
6992 .unwrap();
6993 t.add_index("by_n".into(), "name").unwrap();
6994 let err = cat
6995 .freeze_oldest_to_cold("by_name", "by_n", 1)
6996 .expect_err("non-integer PK rejected");
6997 match err {
6998 StorageError::Corrupt(s) => assert!(
6999 s.contains("non-integer"),
7000 "error message names the constraint: {s}"
7001 ),
7002 other => panic!("expected Corrupt, got {other:?}"),
7003 }
7004 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7006 assert_eq!(cat.cold_segment_count(), 0);
7007 }
7008
7009 #[test]
7014 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7015 let mut cat = Catalog::new();
7016 cat.create_table(bigint_pk_users_schema()).unwrap();
7017 let t = cat.get_mut("users").unwrap();
7018 for id in 0..6i64 {
7019 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7020 .unwrap();
7021 }
7022 t.add_index("by_id".into(), "id").unwrap();
7023 t.add_index("by_name".into(), "name").unwrap();
7024
7025 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7026
7027 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7031 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7032 assert_eq!(got.len(), 1);
7033 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7034 match got[0] {
7035 RowLocator::Hot(i) => {
7036 assert_eq!(i, 1);
7039 }
7040 RowLocator::Cold { .. } => unreachable!(),
7041 }
7042 }
7043
7044 #[test]
7052 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7053 let mut cat = Catalog::new();
7054 cat.create_table(bigint_pk_users_schema()).unwrap();
7055 let t = cat.get_mut("users").unwrap();
7056 for id in 0..6i64 {
7057 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7058 .unwrap();
7059 }
7060 t.add_index("by_id".into(), "id").unwrap();
7061 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7064 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7065
7066 let new_idx = cat
7068 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7069 .expect("promote ok")
7070 .expect("PK 2 was cold");
7071 assert_eq!(
7072 new_idx, 2,
7073 "promoted row appended after the 2 surviving hot rows"
7074 );
7075
7076 let t = cat.get("users").unwrap();
7077 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7078 let row = make_user_row(2, "u-2");
7080 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7081 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7082
7083 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7086 assert_eq!(entries.len(), 1, "exactly one locator per key");
7087 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7088 assert_eq!(
7090 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7091 .unwrap(),
7092 row
7093 );
7094 assert_eq!(
7097 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7098 .unwrap(),
7099 make_user_row(0, "u-0")
7100 );
7101 }
7102
7103 #[test]
7107 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7108 let mut cat = Catalog::new();
7109 cat.create_table(bigint_pk_users_schema()).unwrap();
7110 let t = cat.get_mut("users").unwrap();
7111 t.insert(make_user_row(7, "alice")).unwrap();
7112 t.add_index("by_id".into(), "id").unwrap();
7113
7114 assert!(
7116 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7117 .unwrap()
7118 .is_none()
7119 );
7120 assert!(
7122 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7123 .unwrap()
7124 .is_none()
7125 );
7126 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7128 assert_eq!(cat.cold_segment_count(), 0);
7129 }
7130
7131 #[test]
7136 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7137 let mut cat = Catalog::new();
7138 cat.create_table(bigint_pk_users_schema()).unwrap();
7139 let t = cat.get_mut("users").unwrap();
7140 for id in 0..5i64 {
7141 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7142 .unwrap();
7143 }
7144 t.add_index("by_id".into(), "id").unwrap();
7145 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7146
7147 assert!(
7149 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7150 .is_some(),
7151 "frozen PK resolves before shadow"
7152 );
7153 let removed = cat
7154 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7155 .unwrap();
7156 assert_eq!(removed, 1, "exactly one cold locator retired");
7157
7158 assert!(
7161 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7162 .is_none(),
7163 "shadowed key no longer resolves"
7164 );
7165 assert_eq!(
7167 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7168 .unwrap(),
7169 make_user_row(0, "u-0")
7170 );
7171 assert_eq!(
7172 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7173 .unwrap(),
7174 make_user_row(2, "u-2")
7175 );
7176 }
7177
7178 #[test]
7183 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7184 let mut cat = Catalog::new();
7185 cat.create_table(bigint_pk_users_schema()).unwrap();
7186 let t = cat.get_mut("users").unwrap();
7187 t.insert(make_user_row(1, "alice")).unwrap();
7188 t.add_index("by_id".into(), "id").unwrap();
7189 assert_eq!(
7190 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7191 .unwrap(),
7192 0,
7193 "hot-only key drops no cold locators"
7194 );
7195 assert_eq!(
7196 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7197 .unwrap(),
7198 0,
7199 "absent key drops no cold locators"
7200 );
7201 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7202 }
7203
7204 #[test]
7206 fn promote_and_shadow_reject_invalid_inputs() {
7207 let mut cat = Catalog::new();
7208 cat.create_table(bigint_pk_users_schema()).unwrap();
7209 let t = cat.get_mut("users").unwrap();
7210 t.insert(make_user_row(1, "alice")).unwrap();
7211 t.add_index("by_id".into(), "id").unwrap();
7212
7213 assert!(matches!(
7215 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7216 Err(StorageError::Corrupt(_))
7217 ));
7218 assert!(matches!(
7219 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7220 Err(StorageError::Corrupt(_))
7221 ));
7222 assert!(matches!(
7224 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7225 Err(StorageError::Corrupt(_))
7226 ));
7227 assert!(matches!(
7228 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7229 Err(StorageError::Corrupt(_))
7230 ));
7231 }
7232
7233 #[test]
7240 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7241 let mut a = Catalog::new();
7242 let mut b = Catalog::new();
7243 for cat in [&mut a, &mut b] {
7244 cat.create_table(bigint_pk_users_schema()).unwrap();
7245 let t = cat.get_mut("users").unwrap();
7246 for id in 0..10i64 {
7247 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7248 .unwrap();
7249 }
7250 t.add_index("by_id".into(), "id").unwrap();
7251 }
7252 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7253 let slice = b
7254 .prepare_freeze_slice("users", "by_id", 0..6)
7255 .expect("prepare");
7256 let parallel = b
7257 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7258 .expect("commit");
7259 assert_eq!(single.segment_id, parallel.segment_id);
7260 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7261 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7262 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7263 for id in 0..10i64 {
7265 assert_eq!(
7266 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7267 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7268 "PK {id} differs after single vs slice freeze"
7269 );
7270 }
7271 }
7272
7273 #[test]
7278 fn commit_freeze_slices_two_slices_match_single_slice() {
7279 let mut a = Catalog::new();
7280 let mut b = Catalog::new();
7281 for cat in [&mut a, &mut b] {
7282 cat.create_table(bigint_pk_users_schema()).unwrap();
7283 let t = cat.get_mut("users").unwrap();
7284 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7287 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7288 .unwrap();
7289 }
7290 t.add_index("by_id".into(), "id").unwrap();
7291 }
7292 let single = a
7293 .prepare_freeze_slice("users", "by_id", 0..8)
7294 .expect("prepare");
7295 let one = a
7296 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7297 .expect("commit one");
7298 let s1 = b
7299 .prepare_freeze_slice("users", "by_id", 0..4)
7300 .expect("prepare s1");
7301 let s2 = b
7302 .prepare_freeze_slice("users", "by_id", 4..8)
7303 .expect("prepare s2");
7304 let two = b
7305 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7306 .expect("commit two");
7307 assert_eq!(one.segment_bytes, two.segment_bytes);
7308 assert_eq!(one.frozen_rows, two.frozen_rows);
7309 for id in 0..10i64 {
7312 assert_eq!(
7313 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7314 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7315 "PK {id} differs after one-slice vs two-slice freeze"
7316 );
7317 }
7318 }
7319
7320 #[test]
7322 fn commit_freeze_slices_rejects_gap() {
7323 let mut cat = Catalog::new();
7324 cat.create_table(bigint_pk_users_schema()).unwrap();
7325 let t = cat.get_mut("users").unwrap();
7326 for id in 0..6i64 {
7327 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7328 .unwrap();
7329 }
7330 t.add_index("by_id".into(), "id").unwrap();
7331 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7332 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7333 assert!(matches!(
7334 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7335 Err(StorageError::Corrupt(_))
7336 ));
7337 assert_eq!(cat.cold_segment_count(), 0);
7339 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7340 }
7341
7342 #[test]
7344 fn commit_freeze_slices_empty_is_noop() {
7345 let mut cat = Catalog::new();
7346 cat.create_table(bigint_pk_users_schema()).unwrap();
7347 let t = cat.get_mut("users").unwrap();
7348 for id in 0..3i64 {
7349 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7350 .unwrap();
7351 }
7352 t.add_index("by_id".into(), "id").unwrap();
7353 let report = cat
7354 .commit_freeze_slices("users", "by_id", Vec::new())
7355 .unwrap();
7356 assert_eq!(report.frozen_rows, 0);
7357 assert_eq!(cat.cold_segment_count(), 0);
7358 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7359 }
7360
7361 #[test]
7368 fn compact_merges_small_segments_storage_unit() {
7369 let mut cat = Catalog::new();
7370 cat.create_table(bigint_pk_users_schema()).unwrap();
7371 let t = cat.get_mut("users").unwrap();
7372 for id in 0..8i64 {
7373 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7374 .unwrap();
7375 }
7376 t.add_index("by_id".into(), "id").unwrap();
7377 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7379 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7380 assert_eq!(cat.cold_segment_count(), 2);
7381 assert_eq!(cat.cold_segment_slot_count(), 2);
7382
7383 let max_seg_bytes = cat
7386 .cold_segment_ids_global()
7387 .iter()
7388 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7389 .max()
7390 .unwrap();
7391 let target = max_seg_bytes + 1;
7392
7393 let report = cat
7394 .compact_cold_segments("users", "by_id", target)
7395 .expect("compact succeeds");
7396 assert_eq!(report.sources.len(), 2);
7397 let merged_id = report.merged_segment_id.expect("merge happened");
7398 assert_eq!(report.merged_rows, 6);
7399 assert_eq!(report.deleted_rows_pruned, 0);
7400 assert!(!report.merged_segment_bytes.is_empty());
7401
7402 assert_eq!(cat.cold_segment_count(), 1);
7405 assert_eq!(cat.cold_segment_slot_count(), 3);
7406 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7407
7408 for id in 0..8i64 {
7411 let got = cat
7412 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7413 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7414 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7415 }
7416 }
7417
7418 #[test]
7422 fn compact_drops_shadowed_cold_rows() {
7423 let mut cat = Catalog::new();
7424 cat.create_table(bigint_pk_users_schema()).unwrap();
7425 let t = cat.get_mut("users").unwrap();
7426 for id in 0..6i64 {
7427 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7428 .unwrap();
7429 }
7430 t.add_index("by_id".into(), "id").unwrap();
7431 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7432 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7433 assert_eq!(
7435 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7436 .unwrap(),
7437 1
7438 );
7439 assert_eq!(
7440 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7441 .unwrap(),
7442 1
7443 );
7444
7445 let max_seg_bytes = cat
7446 .cold_segment_ids_global()
7447 .iter()
7448 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7449 .max()
7450 .unwrap();
7451 let report = cat
7452 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7453 .expect("compact succeeds");
7454 assert_eq!(report.sources.len(), 2);
7455 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7456 assert_eq!(report.deleted_rows_pruned, 2);
7457
7458 for shadowed in [1i64, 4i64] {
7460 assert!(
7461 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7462 .is_none(),
7463 "shadowed PK {shadowed} must remain invisible after compact"
7464 );
7465 }
7466 for live in [0i64, 2, 3, 5] {
7468 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7469 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7470 }
7471 }
7472
7473 #[test]
7476 fn compact_is_noop_below_two_candidates() {
7477 let mut cat = Catalog::new();
7478 cat.create_table(bigint_pk_users_schema()).unwrap();
7479 let t = cat.get_mut("users").unwrap();
7480 for id in 0..6i64 {
7481 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7482 .unwrap();
7483 }
7484 t.add_index("by_id".into(), "id").unwrap();
7485 let report = cat
7487 .compact_cold_segments("users", "by_id", 1 << 30)
7488 .expect("noop ok");
7489 assert!(report.merged_segment_id.is_none());
7490 assert!(report.sources.is_empty());
7491
7492 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7494 let report = cat
7495 .compact_cold_segments("users", "by_id", 1 << 30)
7496 .expect("noop ok");
7497 assert!(report.merged_segment_id.is_none());
7498 assert_eq!(cat.cold_segment_count(), 1);
7499
7500 let report = cat
7503 .compact_cold_segments("users", "by_id", 1)
7504 .expect("noop ok");
7505 assert!(report.merged_segment_id.is_none());
7506 assert_eq!(cat.cold_segment_count(), 1);
7507 }
7508
7509 #[test]
7517 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7518 let mut cat = Catalog::new();
7519 cat.create_table(bigint_pk_users_schema()).unwrap();
7520 let t = cat.get_mut("users").unwrap();
7521 for id in 0..6i64 {
7522 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7523 .unwrap();
7524 }
7525 t.add_index("by_id".into(), "id").unwrap();
7526 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7527 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7528 let max_seg_bytes = cat
7529 .cold_segment_ids_global()
7530 .iter()
7531 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7532 .max()
7533 .unwrap();
7534 let report = cat
7535 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7536 .expect("compact ok");
7537 let merged_id = report.merged_segment_id.unwrap();
7538
7539 let cat_bytes = cat.serialize();
7544 let merged_bytes = report.merged_segment_bytes.clone();
7545
7546 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7547 restored
7548 .load_segment_bytes_at(merged_id, merged_bytes)
7549 .expect("reload merged ok");
7550
7551 for id in 0..6i64 {
7553 let got = restored
7554 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7555 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7556 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7557 }
7558 assert_eq!(restored.cold_segment_count(), 1);
7561 }
7562
7563 #[test]
7566 fn load_segment_bytes_at_pads_and_rejects_collision() {
7567 let mut cat = Catalog::new();
7568 cat.create_table(bigint_pk_users_schema()).unwrap();
7569 let t = cat.get_mut("users").unwrap();
7570 for id in 0..4i64 {
7571 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7572 .unwrap();
7573 }
7574 t.add_index("by_id".into(), "id").unwrap();
7575 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7576 let bytes_seg0 = report.segment_bytes.clone();
7577
7578 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7582 .expect("pad + load ok");
7583 assert_eq!(cat.cold_segment_slot_count(), 6);
7584 assert_eq!(cat.cold_segment_count(), 2);
7585
7586 assert!(matches!(
7588 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7589 Err(StorageError::Corrupt(_))
7590 ));
7591 assert!(matches!(
7593 cat.load_segment_bytes_at(0, bytes_seg0),
7594 Err(StorageError::Corrupt(_))
7595 ));
7596 }
7597
7598 #[test]
7602 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7603 let mut cat = Catalog::new();
7604 cat.create_table(bigint_pk_users_schema()).unwrap();
7605 let t = cat.get_mut("users").unwrap();
7606 for id in 0..4i64 {
7607 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7608 .unwrap();
7609 }
7610 t.add_index("by_id".into(), "id").unwrap();
7611
7612 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7614 let promoted = cat
7615 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7616 .unwrap();
7617 assert!(promoted.is_some());
7618 let entries_after_promote = cat
7619 .get("users")
7620 .unwrap()
7621 .index_on(0)
7622 .unwrap()
7623 .lookup_eq(&IndexKey::Int(0))
7624 .to_vec();
7625 assert_eq!(entries_after_promote.len(), 1);
7626 assert!(entries_after_promote[0].is_hot());
7627
7628 for id in [2i64, 3] {
7635 assert_eq!(
7636 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7637 .unwrap(),
7638 make_user_row(id, &alloc::format!("u-{id}"))
7639 );
7640 }
7641 }
7642}