1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138 Bytes,
146}
147
148impl fmt::Display for DataType {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 match self {
151 Self::SmallInt => f.write_str("SMALLINT"),
152 Self::Int => f.write_str("INT"),
153 Self::BigInt => f.write_str("BIGINT"),
154 Self::Float => f.write_str("FLOAT"),
155 Self::Text => f.write_str("TEXT"),
156 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
157 Self::Char(n) => write!(f, "CHAR({n})"),
158 Self::Bool => f.write_str("BOOL"),
159 Self::Vector { dim, encoding } => match encoding {
160 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
161 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
162 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
163 },
164 Self::Numeric { precision, scale } => {
165 if *scale == 0 {
166 write!(f, "NUMERIC({precision})")
167 } else {
168 write!(f, "NUMERIC({precision}, {scale})")
169 }
170 }
171 Self::Date => f.write_str("DATE"),
172 Self::Timestamp => f.write_str("TIMESTAMP"),
173 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
174 Self::Interval => f.write_str("INTERVAL"),
175 Self::Json => f.write_str("JSON"),
176 Self::Jsonb => f.write_str("JSONB"),
177 Self::Bytes => f.write_str("BYTEA"),
178 }
179 }
180}
181
182#[derive(Debug, Clone, PartialEq)]
186#[non_exhaustive]
187pub enum Value {
188 SmallInt(i16),
189 Int(i32),
190 BigInt(i64),
191 Float(f64),
192 Text(String),
193 Bool(bool),
194 Vector(Vec<f32>),
195 Sq8Vector(crate::quantize::Sq8Vector),
202 HalfVector(crate::halfvec::HalfVector),
208 Numeric {
212 scaled: i128,
213 scale: u8,
214 },
215 Date(i32),
217 Timestamp(i64),
219 Interval {
222 months: i32,
223 micros: i64,
224 },
225 Json(String),
229 Bytes(Vec<u8>),
235 Null,
236}
237
238impl Value {
239 pub fn data_type(&self) -> Option<DataType> {
241 match self {
242 Self::SmallInt(_) => Some(DataType::SmallInt),
243 Self::Int(_) => Some(DataType::Int),
244 Self::BigInt(_) => Some(DataType::BigInt),
245 Self::Float(_) => Some(DataType::Float),
246 Self::Text(_) => Some(DataType::Text),
249 Self::Bool(_) => Some(DataType::Bool),
250 Self::Vector(v) => Some(DataType::Vector {
251 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
252 encoding: VecEncoding::F32,
253 }),
254 Self::Sq8Vector(q) => Some(DataType::Vector {
255 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
256 encoding: VecEncoding::Sq8,
257 }),
258 Self::HalfVector(h) => Some(DataType::Vector {
259 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
260 encoding: VecEncoding::F16,
261 }),
262 Self::Numeric { scale, .. } => Some(DataType::Numeric {
267 precision: 0,
268 scale: *scale,
269 }),
270 Self::Date(_) => Some(DataType::Date),
271 Self::Timestamp(_) => Some(DataType::Timestamp),
272 Self::Interval { .. } => Some(DataType::Interval),
273 Self::Json(_) => Some(DataType::Json),
274 Self::Bytes(_) => Some(DataType::Bytes),
275 Self::Null => None,
276 }
277 }
278
279 pub const fn is_null(&self) -> bool {
280 matches!(self, Self::Null)
281 }
282}
283
284#[derive(Debug, Clone, PartialEq)]
287pub struct Row {
288 pub values: Vec<Value>,
289}
290
291impl Row {
292 pub const fn new(values: Vec<Value>) -> Self {
293 Self { values }
294 }
295
296 pub fn len(&self) -> usize {
297 self.values.len()
298 }
299
300 pub fn is_empty(&self) -> bool {
301 self.values.is_empty()
302 }
303}
304
305#[derive(Debug, Clone, PartialEq)]
306pub struct ColumnSchema {
307 pub name: String,
308 pub ty: DataType,
309 pub nullable: bool,
310 pub default: Option<Value>,
315 pub runtime_default: Option<String>,
323 pub auto_increment: bool,
327}
328
329#[derive(Debug, Clone, PartialEq)]
330pub struct TableSchema {
331 pub name: String,
332 pub columns: Vec<ColumnSchema>,
333 pub hot_tier_bytes: Option<u64>,
339 pub foreign_keys: Vec<ForeignKeyConstraint>,
346 pub uniqueness_constraints: Vec<UniquenessConstraint>,
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct UniquenessConstraint {
361 pub is_primary_key: bool,
366 pub columns: Vec<usize>,
370}
371
372#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct ForeignKeyConstraint {
378 pub name: Option<String>,
382 pub local_columns: Vec<usize>,
385 pub parent_table: String,
387 pub parent_columns: Vec<usize>,
392 pub on_delete: FkAction,
394 pub on_update: FkAction,
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401pub enum FkAction {
402 Restrict,
403 Cascade,
404 SetNull,
405 SetDefault,
406 NoAction,
407}
408
409impl FkAction {
410 pub const fn tag(self) -> u8 {
412 match self {
413 Self::Restrict => 0,
414 Self::Cascade => 1,
415 Self::SetNull => 2,
416 Self::SetDefault => 3,
417 Self::NoAction => 4,
418 }
419 }
420 pub const fn from_tag(b: u8) -> Option<Self> {
421 Some(match b {
422 0 => Self::Restrict,
423 1 => Self::Cascade,
424 2 => Self::SetNull,
425 3 => Self::SetDefault,
426 4 => Self::NoAction,
427 _ => return None,
428 })
429 }
430}
431
432impl TableSchema {
433 pub fn column_position(&self, name: &str) -> Option<usize> {
434 self.columns.iter().position(|c| c.name == name)
435 }
436}
437
438#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
443pub enum IndexKey {
444 Int(i64),
445 Text(String),
446 Bool(bool),
447}
448
449impl IndexKey {
450 pub fn from_value(v: &Value) -> Option<Self> {
451 match v {
452 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
453 Value::Int(n) => Some(Self::Int(i64::from(*n))),
454 Value::BigInt(n) => Some(Self::Int(*n)),
455 Value::Text(s) => Some(Self::Text(s.clone())),
456 Value::Bool(b) => Some(Self::Bool(*b)),
457 Value::Date(d) => Some(Self::Int(i64::from(*d))),
460 Value::Timestamp(t) => Some(Self::Int(*t)),
461 Value::Null
466 | Value::Float(_)
467 | Value::Vector(_)
468 | Value::Sq8Vector(_)
469 | Value::HalfVector(_)
470 | Value::Numeric { .. }
471 | Value::Interval { .. }
472 | Value::Json(_)
473 | Value::Bytes(_) => None,
474 }
475 }
476}
477
478#[derive(Debug, Clone)]
483pub struct Index {
484 pub name: String,
485 pub column_position: usize,
486 pub kind: IndexKind,
487 pub included_columns: Vec<usize>,
497 pub partial_predicate: Option<String>,
504 pub expression: Option<String>,
509 pub is_unique: bool,
516 pub extra_column_positions: Vec<usize>,
525}
526
527pub const NSW_DEFAULT_M: usize = 16;
530
531#[derive(Debug, Clone)]
539pub struct FreezeReport {
540 pub segment_id: u32,
543 pub frozen_rows: usize,
546 pub bytes_freed: u64,
550 pub segment_bytes: Vec<u8>,
555}
556
557#[derive(Debug, Clone)]
566pub struct FreezeSlice {
567 pub row_range: core::ops::Range<usize>,
572 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
578}
579
580#[derive(Debug, Clone)]
596pub struct CompactReport {
597 pub sources: Vec<u32>,
599 pub merged_segment_id: Option<u32>,
601 pub merged_segment_bytes: Vec<u8>,
603 pub merged_rows: usize,
605 pub deleted_rows_pruned: usize,
610 pub bytes_reclaimed_estimate: u64,
614}
615
616#[derive(Debug, Clone)]
617pub enum IndexKind {
618 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
635 Nsw(NswGraph),
637 Brin {
644 column_type: DataType,
648 },
649}
650
651#[derive(Debug, Clone)]
660pub struct NswGraph {
661 pub m: usize,
663 pub m_max_0: usize,
666 pub entry: Option<usize>,
669 pub entry_level: u8,
671 pub levels: PersistentVec<u8>,
678 pub layers: Vec<PersistentVec<Vec<u32>>>,
694}
695
696impl NswGraph {
697 fn new(m: usize) -> Self {
698 Self {
699 m,
700 m_max_0: m.saturating_mul(2),
701 entry: None,
702 entry_level: 0,
703 levels: PersistentVec::new(),
704 layers: alloc::vec![PersistentVec::new()],
705 }
706 }
707
708 pub const fn cap_for_layer(&self, layer: u8) -> usize {
710 if layer == 0 { self.m_max_0 } else { self.m }
711 }
712}
713
714#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
721 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
724 x ^= x >> 30;
725 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
726 x ^= x >> 27;
727 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
728 x ^= x >> 31;
729 let mut level: u8 = 0;
734 while x & 0xF == 0 && level < MAX_LEVEL {
735 level += 1;
736 x >>= 4;
737 }
738 level
739}
740
741impl Index {
742 fn new_btree(name: String, column_position: usize) -> Self {
743 Self {
744 name,
745 column_position,
746 kind: IndexKind::BTree(PersistentBTreeMap::new()),
747 included_columns: Vec::new(),
748 partial_predicate: None,
749 expression: None,
750 is_unique: false,
751 extra_column_positions: Vec::new(),
752 }
753 }
754
755 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
756 Self {
757 name,
758 column_position,
759 kind: IndexKind::Nsw(NswGraph::new(m)),
760 included_columns: Vec::new(),
761 partial_predicate: None,
762 expression: None,
763 is_unique: false,
764 extra_column_positions: Vec::new(),
765 }
766 }
767
768 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
772 Self {
773 name,
774 column_position,
775 kind: IndexKind::Brin { column_type },
776 included_columns: Vec::new(),
777 partial_predicate: None,
778 expression: None,
779 is_unique: false,
780 extra_column_positions: Vec::new(),
781 }
782 }
783
784 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
793 match &self.kind {
794 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
795 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
797 }
798 }
799
800 pub const fn nsw(&self) -> Option<&NswGraph> {
803 match &self.kind {
804 IndexKind::Nsw(g) => Some(g),
805 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
806 }
807 }
808
809 pub const fn is_brin(&self) -> bool {
814 matches!(self.kind, IndexKind::Brin { .. })
815 }
816}
817
818#[derive(Debug, Clone)]
834pub struct Table {
835 schema: TableSchema,
836 rows: PersistentVec<Row>,
837 indices: Vec<Index>,
838 hot_bytes: u64,
839 cold_row_count: u64,
853 cold_row_count_stale: bool,
858}
859
860impl Table {
861 pub fn new(schema: TableSchema) -> Self {
862 Self {
863 schema,
864 rows: PersistentVec::new(),
865 indices: Vec::new(),
866 hot_bytes: 0,
867 cold_row_count: 0,
868 cold_row_count_stale: false,
869 }
870 }
871
872 #[must_use]
876 pub const fn hot_bytes(&self) -> u64 {
877 self.hot_bytes
878 }
879
880 #[must_use]
883 pub const fn cold_row_count(&self) -> u64 {
884 self.cold_row_count
885 }
886
887 pub fn set_cold_row_count(&mut self, n: u64) {
890 self.cold_row_count = n;
891 self.cold_row_count_stale = false;
892 }
893
894 pub fn mark_cold_row_count_stale(&mut self) {
899 self.cold_row_count_stale = true;
900 }
901
902 #[must_use]
906 pub const fn cold_row_count_stale(&self) -> bool {
907 self.cold_row_count_stale
908 }
909
910 #[must_use]
921 pub fn count_cold_locators(&self) -> u64 {
922 let mut best: u64 = 0;
923 for idx in &self.indices {
924 if let IndexKind::BTree(map) = &idx.kind {
925 let n: u64 = map
926 .iter()
927 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
928 .sum();
929 if n > best {
930 best = n;
931 }
932 }
933 }
934 best
935 }
936
937 pub const fn schema(&self) -> &TableSchema {
938 &self.schema
939 }
940
941 pub const fn schema_mut(&mut self) -> &mut TableSchema {
945 &mut self.schema
946 }
947
948 pub const fn rows(&self) -> &PersistentVec<Row> {
952 &self.rows
953 }
954
955 pub const fn row_count(&self) -> usize {
956 self.rows.len()
957 }
958
959 pub fn indices_mut(&mut self) -> &mut [Index] {
964 &mut self.indices
965 }
966
967 pub fn indices(&self) -> &[Index] {
968 &self.indices
969 }
970
971 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
977 let ty = self.schema.columns.get(col_pos)?.ty;
978 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
979 return None;
980 }
981 let mut max: Option<i64> = None;
982 for row in &self.rows {
983 match row.values.get(col_pos) {
984 Some(Value::SmallInt(n)) => {
985 let v = i64::from(*n);
986 max = Some(max.map_or(v, |m| m.max(v)));
987 }
988 Some(Value::Int(n)) => {
989 let v = i64::from(*n);
990 max = Some(max.map_or(v, |m| m.max(v)));
991 }
992 Some(Value::BigInt(n)) => {
993 max = Some(max.map_or(*n, |m| m.max(*n)));
994 }
995 _ => {}
996 }
997 }
998 Some(max.map_or(1, |m| m + 1))
999 }
1000
1001 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1005 self.indices
1012 .iter()
1013 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1014 .or_else(|| {
1015 self.indices
1016 .iter()
1017 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
1018 })
1019 }
1020
1021 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1025 if row.len() != self.schema.columns.len() {
1026 return Err(StorageError::ArityMismatch {
1027 expected: self.schema.columns.len(),
1028 actual: row.len(),
1029 });
1030 }
1031 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1032 if val.is_null() {
1033 if !col.nullable {
1034 return Err(StorageError::NullInNotNull {
1035 column: col.name.clone(),
1036 });
1037 }
1038 continue;
1039 }
1040 let actual = val.data_type().expect("non-null");
1041 let compatible = actual == col.ty
1055 || matches!(
1056 (actual, col.ty),
1057 (
1058 DataType::Text,
1059 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1060 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1061 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1062 | (DataType::Timestamp, DataType::Timestamptz)
1063 | (DataType::Timestamptz, DataType::Timestamp)
1064 )
1065 || matches!(
1066 (actual, col.ty),
1067 (
1068 DataType::Numeric { scale: a, .. },
1069 DataType::Numeric { scale: b, .. },
1070 ) if a == b
1071 );
1072 if !compatible {
1073 return Err(StorageError::TypeMismatch {
1074 column: col.name.clone(),
1075 expected: col.ty,
1076 actual,
1077 position: i,
1078 });
1079 }
1080 }
1081 let new_row_idx = self.rows.len();
1082 for idx in &mut self.indices {
1086 if let IndexKind::BTree(map) = &mut idx.kind
1087 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1088 {
1089 let mut entries = map.get(&key).cloned().unwrap_or_default();
1095 entries.push(RowLocator::Hot(new_row_idx));
1096 map.insert_mut(key, entries);
1097 }
1098 }
1099 self.hot_bytes = self
1102 .hot_bytes
1103 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1104 self.rows.push_mut(row);
1109 let new_row_idx = self.rows.len() - 1;
1112 let nsw_targets: Vec<usize> = self
1113 .indices
1114 .iter()
1115 .enumerate()
1116 .filter_map(|(i, idx)| {
1117 if matches!(idx.kind, IndexKind::Nsw(_)) {
1118 Some(i)
1119 } else {
1120 None
1121 }
1122 })
1123 .collect();
1124 for idx_pos in nsw_targets {
1125 nsw_insert_at(self, idx_pos, new_row_idx);
1126 }
1127 Ok(())
1128 }
1129
1130 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1134 if self.indices.iter().any(|i| i.name == name) {
1135 return Err(StorageError::DuplicateIndex { name });
1136 }
1137 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1138 StorageError::ColumnNotFound {
1139 column: column_name.into(),
1140 }
1141 })?;
1142 let mut idx = Index::new_btree(name, column_position);
1143 if let IndexKind::BTree(map) = &mut idx.kind {
1144 for (i, row) in self.rows.iter().enumerate() {
1145 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1146 let mut entries = map.get(&key).cloned().unwrap_or_default();
1147 entries.push(RowLocator::Hot(i));
1148 map.insert_mut(key, entries);
1149 }
1150 }
1151 }
1152 self.indices.push(idx);
1153 Ok(())
1154 }
1155
1156 pub fn add_nsw_index(
1161 &mut self,
1162 name: String,
1163 column_name: &str,
1164 m: usize,
1165 ) -> Result<(), StorageError> {
1166 self.add_nsw_index_inner(name, column_name, m, None)
1167 }
1168
1169 pub fn rebuild_nsw_index(
1181 &mut self,
1182 name: &str,
1183 new_encoding: Option<VecEncoding>,
1184 ) -> Result<(), StorageError> {
1185 let idx_pos = self
1186 .indices
1187 .iter()
1188 .position(|i| i.name == name)
1189 .ok_or_else(|| StorageError::IndexNotFound {
1190 name: String::from(name),
1191 })?;
1192 let col_pos = self.indices[idx_pos].column_position;
1193 let m = match &self.indices[idx_pos].kind {
1194 IndexKind::Nsw(g) => g.m,
1195 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1196 return Err(StorageError::Unsupported(format!(
1197 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1198 )));
1199 }
1200 };
1201 let col_name = self.schema.columns[col_pos].name.clone();
1202 if let Some(target) = new_encoding {
1205 let current = match self.schema.columns[col_pos].ty {
1206 DataType::Vector { encoding, .. } => encoding,
1207 ref other => {
1208 return Err(StorageError::Unsupported(format!(
1209 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1210 )));
1211 }
1212 };
1213 if target != current {
1214 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1215 unreachable!("checked above")
1216 };
1217 let n = self.rows.len();
1218 for i in 0..n {
1219 let row = self
1220 .rows
1221 .get_mut(i)
1222 .expect("row index in bounds (we iterated up to len())");
1223 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1224 let recoded = recode_vector_cell(cell, target)?;
1225 row.values[col_pos] = recoded;
1226 }
1227 self.schema.columns[col_pos].ty = DataType::Vector {
1228 dim,
1229 encoding: target,
1230 };
1231 }
1232 }
1233 self.indices.remove(idx_pos);
1235 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1236 Ok(())
1237 }
1238
1239 pub fn restore_nsw_index(
1244 &mut self,
1245 name: String,
1246 column_name: &str,
1247 graph: NswGraph,
1248 ) -> Result<(), StorageError> {
1249 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1250 }
1251
1252 pub fn restore_btree_index(
1259 &mut self,
1260 name: String,
1261 column_name: &str,
1262 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1263 ) -> Result<(), StorageError> {
1264 if self.indices.iter().any(|i| i.name == name) {
1265 return Err(StorageError::DuplicateIndex { name });
1266 }
1267 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1268 StorageError::ColumnNotFound {
1269 column: column_name.into(),
1270 }
1271 })?;
1272 self.indices.push(Index {
1273 name,
1274 column_position,
1275 kind: IndexKind::BTree(map),
1276 included_columns: Vec::new(),
1277 partial_predicate: None,
1278 expression: None,
1279 is_unique: false,
1280 extra_column_positions: Vec::new(),
1281 });
1282 Ok(())
1283 }
1284
1285 pub fn restore_brin_index(
1290 &mut self,
1291 name: String,
1292 column_name: &str,
1293 column_type: DataType,
1294 ) -> Result<(), StorageError> {
1295 if self.indices.iter().any(|i| i.name == name) {
1296 return Err(StorageError::DuplicateIndex { name });
1297 }
1298 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1299 StorageError::ColumnNotFound {
1300 column: column_name.into(),
1301 }
1302 })?;
1303 self.indices.push(Index::new_brin(name, column_position, column_type));
1304 Ok(())
1305 }
1306
1307 pub fn add_brin_index(
1311 &mut self,
1312 name: String,
1313 column_name: &str,
1314 ) -> Result<(), StorageError> {
1315 if self.indices.iter().any(|i| i.name == name) {
1316 return Err(StorageError::DuplicateIndex { name });
1317 }
1318 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1319 StorageError::ColumnNotFound {
1320 column: column_name.into(),
1321 }
1322 })?;
1323 let column_type = self.schema.columns[column_position].ty;
1324 self.indices.push(Index::new_brin(name, column_position, column_type));
1325 Ok(())
1326 }
1327
1328 pub fn register_cold_locators<I>(
1345 &mut self,
1346 index_name: &str,
1347 locators: I,
1348 ) -> Result<usize, StorageError>
1349 where
1350 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1351 {
1352 let idx = self
1353 .indices
1354 .iter_mut()
1355 .find(|i| i.name == index_name)
1356 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1357 let map = match &mut idx.kind {
1358 IndexKind::BTree(map) => map,
1359 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1360 return Err(StorageError::Corrupt(format!(
1361 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1362 )));
1363 }
1364 };
1365 let mut count = 0usize;
1366 for (key, locator) in locators {
1367 let mut entries = map.get(&key).cloned().unwrap_or_default();
1368 entries.push(locator);
1369 map.insert_mut(key, entries);
1370 count += 1;
1371 }
1372 Ok(count)
1373 }
1374
1375 pub fn remove_cold_locators_for_key(
1385 &mut self,
1386 index_name: &str,
1387 key: &IndexKey,
1388 ) -> Result<usize, StorageError> {
1389 let idx = self
1390 .indices
1391 .iter_mut()
1392 .find(|i| i.name == index_name)
1393 .ok_or_else(|| {
1394 StorageError::Corrupt(format!(
1395 "remove_cold_locators_for_key: index {index_name:?} not found"
1396 ))
1397 })?;
1398 let map = match &mut idx.kind {
1399 IndexKind::BTree(map) => map,
1400 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1401 return Err(StorageError::Corrupt(format!(
1402 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1403 cold locators apply only to BTree indices"
1404 )));
1405 }
1406 };
1407 let Some(entries) = map.get(key) else {
1408 return Ok(0);
1409 };
1410 let mut kept: Vec<RowLocator> =
1411 entries.iter().copied().filter(RowLocator::is_hot).collect();
1412 let removed = entries.len() - kept.len();
1413 if removed == 0 {
1414 return Ok(0);
1415 }
1416 kept.shrink_to_fit();
1417 map.insert_mut(key.clone(), kept);
1425 Ok(removed)
1426 }
1427
1428 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1434 if positions.is_empty() {
1435 return 0;
1436 }
1437 let mut to_remove = alloc::vec![false; self.rows.len()];
1441 let mut removed = 0;
1442 for &p in positions {
1443 if p < to_remove.len() && !to_remove[p] {
1444 to_remove[p] = true;
1445 removed += 1;
1446 }
1447 }
1448 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1449 let mut removed_bytes: u64 = 0;
1450 for (i, row) in self.rows.iter().enumerate() {
1451 if to_remove[i] {
1452 removed_bytes =
1453 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1454 } else {
1455 new_rows.push_mut(row.clone());
1456 }
1457 }
1458 self.rows = new_rows;
1459 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1460 self.rebuild_indices();
1461 removed
1462 }
1463
1464 pub fn update_row(
1470 &mut self,
1471 position: usize,
1472 new_values: Vec<Value>,
1473 ) -> Result<(), StorageError> {
1474 if position >= self.rows.len() {
1475 return Err(StorageError::Corrupt(alloc::format!(
1476 "update_row: position {position} out of bounds (rows={})",
1477 self.rows.len()
1478 )));
1479 }
1480 if new_values.len() != self.schema.columns.len() {
1481 return Err(StorageError::ArityMismatch {
1482 expected: self.schema.columns.len(),
1483 actual: new_values.len(),
1484 });
1485 }
1486 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1490 if val.is_null() {
1491 if !col.nullable {
1492 return Err(StorageError::NullInNotNull {
1493 column: col.name.clone(),
1494 });
1495 }
1496 continue;
1497 }
1498 let actual = val.data_type().expect("non-null");
1499 let compatible = actual == col.ty
1500 || matches!(
1501 (actual, col.ty),
1502 (
1503 DataType::Text,
1504 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1505 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1506 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1507 | (DataType::Timestamp, DataType::Timestamptz)
1508 | (DataType::Timestamptz, DataType::Timestamp)
1509 )
1510 || matches!(
1511 (actual, col.ty),
1512 (
1513 DataType::Numeric { scale: a, .. },
1514 DataType::Numeric { scale: b, .. },
1515 ) if a == b
1516 );
1517 if !compatible {
1518 return Err(StorageError::TypeMismatch {
1519 column: col.name.clone(),
1520 expected: col.ty,
1521 actual,
1522 position: i,
1523 });
1524 }
1525 }
1526 let old_row = self
1527 .rows
1528 .get(position)
1529 .expect("position bounds-checked above");
1530 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1531 let new_row = Row::new(new_values);
1532 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1533 self.rows = self
1534 .rows
1535 .set(position, new_row)
1536 .expect("position bounds-checked above");
1537 self.hot_bytes = self
1538 .hot_bytes
1539 .saturating_sub(old_bytes)
1540 .saturating_add(new_bytes);
1541 self.rebuild_indices();
1542 Ok(())
1543 }
1544
1545 fn rebuild_indices(&mut self) {
1552 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1561 .indices
1562 .iter()
1563 .filter_map(|idx| match &idx.kind {
1564 IndexKind::BTree(map) => {
1565 let cold: Vec<(IndexKey, RowLocator)> = map
1566 .iter()
1567 .flat_map(|(k, locs)| {
1568 locs.iter()
1569 .filter(|l| l.is_cold())
1570 .copied()
1571 .map(move |l| (k.clone(), l))
1572 })
1573 .collect();
1574 if cold.is_empty() {
1575 None
1576 } else {
1577 Some((idx.name.clone(), cold))
1578 }
1579 }
1580 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1582 })
1583 .collect();
1584
1585 #[derive(Clone)]
1590 enum RebuildKind {
1591 BTree,
1592 Nsw(usize),
1593 Brin(DataType),
1594 }
1595 let descriptors: Vec<(String, usize, RebuildKind)> = self
1596 .indices
1597 .iter()
1598 .map(|idx| {
1599 let kind = match &idx.kind {
1600 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1601 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1602 IndexKind::BTree(_) => RebuildKind::BTree,
1603 };
1604 (idx.name.clone(), idx.column_position, kind)
1605 })
1606 .collect();
1607 self.indices.clear();
1608 for (name, column_position, rebuild_kind) in descriptors {
1609 match rebuild_kind {
1610 RebuildKind::Nsw(m) => {
1611 let idx = Index::new_nsw(name, column_position, m);
1612 self.indices.push(idx);
1613 let idx_pos = self.indices.len() - 1;
1614 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1615 for row_idx in row_indices {
1616 nsw_insert_at(self, idx_pos, row_idx);
1617 }
1618 }
1619 RebuildKind::Brin(column_type) => {
1620 self.indices.push(Index::new_brin(name, column_position, column_type));
1623 }
1624 RebuildKind::BTree => {
1625 let mut idx = Index::new_btree(name, column_position);
1626 if let IndexKind::BTree(map) = &mut idx.kind {
1627 for (i, row) in self.rows.iter().enumerate() {
1628 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1629 let mut entries = map.get(&key).cloned().unwrap_or_default();
1630 entries.push(RowLocator::Hot(i));
1631 map.insert_mut(key, entries);
1632 }
1633 }
1634 }
1635 self.indices.push(idx);
1636 }
1637 }
1638 }
1639
1640 for (idx_name, locators) in preserved_cold {
1645 let _ = self.register_cold_locators(&idx_name, locators);
1649 }
1650 }
1651
1652 fn add_nsw_index_inner(
1653 &mut self,
1654 name: String,
1655 column_name: &str,
1656 m: usize,
1657 restore: Option<NswGraph>,
1658 ) -> Result<(), StorageError> {
1659 if self.indices.iter().any(|i| i.name == name) {
1660 return Err(StorageError::DuplicateIndex { name });
1661 }
1662 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1663 StorageError::ColumnNotFound {
1664 column: column_name.into(),
1665 }
1666 })?;
1667 if !matches!(
1668 self.schema.columns[column_position].ty,
1669 DataType::Vector { .. }
1670 ) {
1671 return Err(StorageError::TypeMismatch {
1672 column: column_name.into(),
1673 expected: DataType::Vector {
1674 dim: 0,
1675 encoding: VecEncoding::F32,
1676 },
1677 actual: self.schema.columns[column_position].ty,
1678 position: column_position,
1679 });
1680 }
1681 if let Some(graph) = restore {
1682 self.indices.push(Index {
1683 name,
1684 column_position,
1685 kind: IndexKind::Nsw(graph),
1686 included_columns: Vec::new(),
1687 partial_predicate: None,
1688 expression: None,
1689 is_unique: false,
1690 extra_column_positions: Vec::new(),
1691 });
1692 return Ok(());
1693 }
1694 let idx = Index::new_nsw(name, column_position, m);
1695 self.indices.push(idx);
1696 let idx_pos = self.indices.len() - 1;
1697 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1700 for row_idx in row_indices {
1701 nsw_insert_at(self, idx_pos, row_idx);
1702 }
1703 Ok(())
1704 }
1705}
1706
1707fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1714 if matches!(cell, Value::Null) {
1715 return Ok(cell);
1716 }
1717 let as_f32: Vec<f32> = match &cell {
1719 Value::Vector(v) => v.clone(),
1720 Value::Sq8Vector(q) => quantize::dequantize(q),
1721 Value::HalfVector(h) => h.to_f32_vec(),
1722 other => {
1723 return Err(StorageError::Unsupported(format!(
1724 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1725 other.data_type()
1726 )));
1727 }
1728 };
1729 Ok(match target {
1734 VecEncoding::F32 => Value::Vector(as_f32),
1735 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1736 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1737 })
1738}
1739
1740fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1747 let col_pos = table.indices[idx_pos].column_position;
1748 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1749 Value::Vector(v) => Some(v.len()),
1750 Value::Sq8Vector(q) => Some(q.bytes.len()),
1751 Value::HalfVector(h) => Some(h.dim()),
1752 _ => None,
1753 };
1754 let Some(dim) = cell_dim else {
1755 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1758 return;
1759 };
1760 if dim == 0 {
1761 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1762 return;
1763 }
1764 let level = nsw_assign_level(new_row_idx);
1765 ensure_node_slot(table, idx_pos, new_row_idx, level);
1766 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1767 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1768 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1769 unreachable!("nsw_insert_at on a non-NSW index")
1770 }
1771 };
1772 if entry.is_none() {
1774 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1775 g.entry = Some(new_row_idx);
1776 g.entry_level = level;
1777 *g.levels
1778 .get_mut(new_row_idx)
1779 .expect("levels slot padded by ensure_node_slot") = level;
1780 }
1781 return;
1782 }
1783 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1785 *g.levels
1786 .get_mut(new_row_idx)
1787 .expect("levels slot padded by ensure_node_slot") = level;
1788 }
1789 let query = match &table.rows[new_row_idx].values[col_pos] {
1790 Value::Vector(v) => v.clone(),
1791 Value::Sq8Vector(q) => quantize::dequantize(q),
1797 Value::HalfVector(h) => h.to_f32_vec(),
1800 _ => return,
1801 };
1802 let mut current = entry.expect("entry was Some above");
1805 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1806 if entry_level > level {
1807 for layer in (level + 1..=entry_level).rev() {
1808 (current, current_d) =
1809 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1810 }
1811 }
1812 let top = level.min(entry_level);
1816 let ef = (m * 2).max(8);
1817 for layer in (0..=top).rev() {
1818 let cap = if layer == 0 { m * 2 } else { m };
1819 let mut candidates = layer_beam_search(
1820 table,
1821 idx_pos,
1822 layer,
1823 current,
1824 current_d,
1825 &query,
1826 ef,
1827 NswMetric::L2,
1828 );
1829 candidates.retain(|&(_, n)| n != new_row_idx);
1830 if let Some(&(d, n)) = candidates.first() {
1833 current = n;
1834 current_d = d;
1835 }
1836 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1837 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1838 }
1839 if level > entry_level
1842 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1843 {
1844 g.entry = Some(new_row_idx);
1845 g.entry_level = level;
1846 }
1847}
1848
1849fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1853 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1854 unreachable!("ensure_node_slot on a BTree index");
1855 };
1856 while g.layers.len() <= level as usize {
1857 g.layers.push(PersistentVec::new());
1858 }
1859 while g.levels.len() <= new_row_idx {
1860 g.levels.push_mut(0);
1861 }
1862 for layer_vec in &mut g.layers {
1863 while layer_vec.len() <= new_row_idx {
1864 layer_vec.push_mut(Vec::new());
1865 }
1866 }
1867}
1868
1869fn greedy_layer_walk(
1875 table: &Table,
1876 idx_pos: usize,
1877 layer: u8,
1878 mut current: usize,
1879 mut current_d: f32,
1880 query: &[f32],
1881) -> (usize, f32) {
1882 let g = match &table.indices[idx_pos].kind {
1883 IndexKind::Nsw(g) => g,
1884 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1885 };
1886 let col_pos = table.indices[idx_pos].column_position;
1887 loop {
1888 let neighbours: &[u32] = g
1889 .layers
1890 .get(layer as usize)
1891 .and_then(|layer_v| layer_v.get(current))
1892 .map_or(&[][..], Vec::as_slice);
1893 let mut best = current;
1894 let mut best_d = current_d;
1895 for &n in neighbours {
1896 let n = n as usize;
1897 let d = vec_l2_sq(table, col_pos, n, query);
1898 if d < best_d {
1899 best = n;
1900 best_d = d;
1901 }
1902 }
1903 if best == current {
1904 return (current, current_d);
1905 }
1906 current = best;
1907 current_d = best_d;
1908 }
1909}
1910
1911#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1924 table: &Table,
1925 idx_pos: usize,
1926 layer: u8,
1927 entry_node: usize,
1928 entry_d: f32,
1929 query: &[f32],
1930 ef: usize,
1931 metric: NswMetric,
1932) -> Vec<(f32, usize)> {
1933 let g = match &table.indices[idx_pos].kind {
1934 IndexKind::Nsw(g) => g,
1935 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1936 };
1937 let col_pos = table.indices[idx_pos].column_position;
1938 let d0 = if matches!(metric, NswMetric::L2) {
1939 entry_d
1940 } else {
1941 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1942 };
1943 let row_count = table.rows.len();
1944 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1945 if entry_node < row_count {
1946 visited[entry_node] = true;
1947 }
1948 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1951 alloc::collections::BinaryHeap::with_capacity(ef);
1952 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1953 alloc::collections::BinaryHeap::with_capacity(ef);
1954 candidates.push(NodeClosest {
1955 dist: d0,
1956 node: entry_node,
1957 });
1958 results.push(NodeFurthest {
1959 dist: d0,
1960 node: entry_node,
1961 });
1962 while let Some(cur) = candidates.pop() {
1963 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1964 if cur.dist > worst && results.len() >= ef {
1965 break;
1966 }
1967 let neighbours: &[u32] = g
1968 .layers
1969 .get(layer as usize)
1970 .and_then(|layer_v| layer_v.get(cur.node))
1971 .map_or(&[][..], Vec::as_slice);
1972 for &n in neighbours {
1973 let n = n as usize;
1974 if n >= row_count || visited[n] {
1975 continue;
1976 }
1977 visited[n] = true;
1978 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
1982 if !dn.is_finite() {
1983 continue;
1984 }
1985 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1986 if results.len() < ef || dn < worst {
1987 results.push(NodeFurthest { dist: dn, node: n });
1988 if results.len() > ef {
1989 results.pop();
1990 }
1991 candidates.push(NodeClosest { dist: dn, node: n });
1992 }
1993 }
1994 }
1995 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
1998 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
1999 out
2000}
2001
2002#[derive(Debug, Clone, Copy)]
2006struct NodeClosest {
2007 dist: f32,
2008 node: usize,
2009}
2010impl PartialEq for NodeClosest {
2011 fn eq(&self, other: &Self) -> bool {
2012 self.dist == other.dist && self.node == other.node
2013 }
2014}
2015impl Eq for NodeClosest {}
2016impl PartialOrd for NodeClosest {
2017 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2018 Some(self.cmp(other))
2019 }
2020}
2021impl Ord for NodeClosest {
2022 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2023 other
2025 .dist
2026 .partial_cmp(&self.dist)
2027 .unwrap_or(core::cmp::Ordering::Equal)
2028 }
2029}
2030
2031#[derive(Debug, Clone, Copy)]
2034struct NodeFurthest {
2035 dist: f32,
2036 node: usize,
2037}
2038impl PartialEq for NodeFurthest {
2039 fn eq(&self, other: &Self) -> bool {
2040 self.dist == other.dist && self.node == other.node
2041 }
2042}
2043impl Eq for NodeFurthest {}
2044impl PartialOrd for NodeFurthest {
2045 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2046 Some(self.cmp(other))
2047 }
2048}
2049impl Ord for NodeFurthest {
2050 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2051 self.dist
2052 .partial_cmp(&other.dist)
2053 .unwrap_or(core::cmp::Ordering::Equal)
2054 }
2055}
2056
2057fn select_neighbours_heuristic(
2066 candidates: &[(f32, usize)],
2067 m: usize,
2068 table: &Table,
2069 col_pos: usize,
2070) -> Vec<usize> {
2071 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2072 for &(d_q, e) in candidates {
2073 if chosen.len() >= m {
2074 break;
2075 }
2076 if !matches!(
2081 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2082 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2083 ) {
2084 continue;
2085 }
2086 let mut covered = false;
2087 for &r in &chosen {
2088 if cell_l2_sq(table, col_pos, e, r) < d_q {
2092 covered = true;
2093 break;
2094 }
2095 }
2096 if !covered {
2097 chosen.push(e);
2098 }
2099 }
2100 chosen
2101}
2102
2103fn connect_at_layer(
2107 table: &mut Table,
2108 idx_pos: usize,
2109 layer: u8,
2110 new_row_idx: usize,
2111 peers: &[usize],
2112) {
2113 let col_pos = table.indices[idx_pos].column_position;
2114 let cap = match &table.indices[idx_pos].kind {
2115 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2116 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2117 };
2118 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2123 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2124 let layer_v = &mut g.layers[layer as usize];
2125 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2126 *slot = peers
2127 .iter()
2128 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2129 .collect();
2130 }
2131 }
2132 for &peer in peers {
2133 if !matches!(
2137 &table.rows[peer].values[col_pos],
2138 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2139 ) {
2140 continue;
2141 }
2142 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2144 let layer_v = &mut g.layers[layer as usize];
2145 if let Some(slot) = layer_v.get_mut(peer)
2146 && !slot.contains(&new_row_u32)
2147 {
2148 slot.push(new_row_u32);
2149 }
2150 }
2151 let needs_trim = match &table.indices[idx_pos].kind {
2155 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2156 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2157 };
2158 if needs_trim {
2159 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2160 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2161 .iter()
2162 .map(|&n| n as usize)
2163 .collect(),
2164 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2165 };
2166 let mut tagged: Vec<(f32, usize)> = current_peers
2171 .iter()
2172 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2173 .collect();
2174 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2175 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2176 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2177 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2178 {
2179 *slot = kept
2180 .into_iter()
2181 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2182 .collect();
2183 }
2184 }
2185 }
2186}
2187
2188fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2195 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2196 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2197 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2198 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2199 }
2200 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2204 halfvec::half_l2_distance_sq_asymmetric(h, query)
2205 }
2206 _ => f32::INFINITY,
2207 }
2208}
2209
2210fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2217 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2218 return f32::INFINITY;
2219 };
2220 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2221 return f32::INFINITY;
2222 };
2223 match (cell_a, cell_b) {
2224 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2225 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2226 quantize::sq8_l2_distance_sq(a, b)
2227 }
2228 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2233 halfvec::half_l2_distance_sq(a, b)
2234 }
2235 _ => f32::INFINITY,
2236 }
2237}
2238
2239fn cell_to_query_metric_distance(
2244 table: &Table,
2245 col_pos: usize,
2246 row: usize,
2247 query: &[f32],
2248 metric: NswMetric,
2249) -> f32 {
2250 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2251 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2252 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2253 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2254 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2255 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2256 },
2257 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2260 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2261 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2262 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2263 },
2264 _ => f32::INFINITY,
2265 }
2266}
2267
2268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2274pub enum NswMetric {
2275 L2,
2278 InnerProduct,
2281 Cosine,
2284}
2285
2286fn nsw_search(
2292 table: &Table,
2293 idx_pos: usize,
2294 query: &[f32],
2295 k: usize,
2296 ef: usize,
2297 metric: NswMetric,
2298) -> Vec<(f32, usize)> {
2299 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2300 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2301 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2302 };
2303 let Some(entry) = entry else {
2304 return Vec::new();
2305 };
2306 let col_pos = table.indices[idx_pos].column_position;
2307 let sq8 = matches!(
2314 table.schema.columns.get(col_pos).map(|c| c.ty),
2315 Some(DataType::Vector {
2316 encoding: VecEncoding::Sq8,
2317 ..
2318 })
2319 );
2320 let ef = if sq8 {
2321 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2322 } else {
2323 ef.max(k)
2324 };
2325 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2327 let mut current = entry;
2328 let mut current_d = entry_d;
2329 for layer in (1..=entry_level).rev() {
2330 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2331 }
2332 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2334 if sq8 {
2335 results = sq8_rerank(table, col_pos, &results, query, metric);
2336 }
2337 results.truncate(k);
2338 results
2339}
2340
2341fn sq8_rerank(
2348 table: &Table,
2349 col_pos: usize,
2350 candidates: &[(f32, usize)],
2351 query: &[f32],
2352 metric: NswMetric,
2353) -> Vec<(f32, usize)> {
2354 let mut out: Vec<(f32, usize)> = candidates
2355 .iter()
2356 .filter_map(|&(adc_d, row)| {
2357 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2358 let Value::Sq8Vector(q) = cell else {
2359 return Some((adc_d, row));
2363 };
2364 let deq = quantize::dequantize(q);
2365 if deq.len() != query.len() {
2366 return None;
2367 }
2368 Some((metric_distance(metric, &deq, query), row))
2369 })
2370 .collect();
2371 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2372 out
2373}
2374
2375const SQ8_RERANK_OVER_FETCH: usize = 3;
2379
2380fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2381 match metric {
2382 NswMetric::L2 => l2_distance_sq(a, b),
2383 NswMetric::InnerProduct => -inner_product_f32(a, b),
2384 NswMetric::Cosine => {
2385 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2386 if na == 0.0 || nb == 0.0 {
2387 return f32::INFINITY;
2388 }
2389 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2392 1.0 - dot / denom
2393 }
2394 }
2395}
2396
2397#[doc(hidden)]
2406#[inline]
2407pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2408 #[cfg(target_arch = "aarch64")]
2409 {
2410 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2411 return unsafe { inner_product_neon(a, b) };
2414 }
2415 }
2416 inner_product_scalar(a, b)
2417}
2418
2419fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2420 let mut dot: f32 = 0.0;
2421 for (x, y) in a.iter().zip(b.iter()) {
2422 dot += x * y;
2423 }
2424 dot
2425}
2426
2427#[cfg(target_arch = "aarch64")]
2428#[target_feature(enable = "neon")]
2429#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2431 use core::arch::aarch64::{
2432 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2433 };
2434 unsafe {
2435 let zero: float32x4_t = vdupq_n_f32(0.0);
2438 let mut acc0 = zero;
2439 let mut acc1 = zero;
2440 let n = a.len();
2441 let mut i = 0usize;
2442 while i + 8 <= n {
2443 let av0 = vld1q_f32(a.as_ptr().add(i));
2444 let bv0 = vld1q_f32(b.as_ptr().add(i));
2445 acc0 = vfmaq_f32(acc0, av0, bv0);
2446 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2447 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2448 acc1 = vfmaq_f32(acc1, av1, bv1);
2449 i += 8;
2450 }
2451 while i + 4 <= n {
2452 let av = vld1q_f32(a.as_ptr().add(i));
2453 let bv = vld1q_f32(b.as_ptr().add(i));
2454 acc0 = vfmaq_f32(acc0, av, bv);
2455 i += 4;
2456 }
2457 vaddvq_f32(vaddq_f32(acc0, acc1))
2458 }
2459}
2460
2461#[doc(hidden)]
2468#[inline]
2469pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2470 #[cfg(target_arch = "aarch64")]
2471 {
2472 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2473 return unsafe { cosine_dot_norms_neon(a, b) };
2475 }
2476 }
2477 cosine_dot_norms_scalar(a, b)
2478}
2479
2480fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2481 let mut dot: f32 = 0.0;
2482 let mut na: f32 = 0.0;
2483 let mut nb: f32 = 0.0;
2484 for (x, y) in a.iter().zip(b.iter()) {
2485 dot += x * y;
2486 na += x * x;
2487 nb += y * y;
2488 }
2489 (dot, na, nb)
2490}
2491
2492#[cfg(target_arch = "aarch64")]
2493#[target_feature(enable = "neon")]
2494#[allow(clippy::many_single_char_names, clippy::similar_names)]
2495unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2496 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2497 unsafe {
2498 let zero: float32x4_t = vdupq_n_f32(0.0);
2499 let mut acc_dot = zero;
2500 let mut acc_na = zero;
2501 let mut acc_nb = zero;
2502 let n = a.len();
2503 let mut i = 0usize;
2504 while i + 4 <= n {
2505 let av = vld1q_f32(a.as_ptr().add(i));
2506 let bv = vld1q_f32(b.as_ptr().add(i));
2507 acc_dot = vfmaq_f32(acc_dot, av, bv);
2508 acc_na = vfmaq_f32(acc_na, av, av);
2509 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2510 i += 4;
2511 }
2512 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2513 }
2514}
2515
2516fn sqrt_newton_f32(x: f32) -> f32 {
2517 if x <= 0.0 {
2518 return 0.0;
2519 }
2520 let mut g = x;
2521 for _ in 0..10 {
2522 g = 0.5 * (g + x / g);
2523 }
2524 g
2525}
2526
2527#[inline]
2535fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2536 #[cfg(target_arch = "aarch64")]
2537 {
2538 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2539 return unsafe { l2_distance_sq_neon(a, b) };
2543 }
2544 }
2545 l2_distance_sq_scalar(a, b)
2546}
2547
2548fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2549 let mut sum: f32 = 0.0;
2550 for (x, y) in a.iter().zip(b.iter()) {
2551 let d = *x - *y;
2552 sum += d * d;
2553 }
2554 sum
2555}
2556
2557#[cfg(target_arch = "aarch64")]
2558#[target_feature(enable = "neon")]
2559#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2561 use core::arch::aarch64::{
2562 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2563 };
2564 unsafe {
2565 let zero: float32x4_t = vdupq_n_f32(0.0);
2570 let mut acc0 = zero;
2571 let mut acc1 = zero;
2572 let n = a.len();
2573 let mut i = 0usize;
2574 while i + 8 <= n {
2577 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2578 acc0 = vfmaq_f32(acc0, d0, d0);
2579 let d1 = vsubq_f32(
2580 vld1q_f32(a.as_ptr().add(i + 4)),
2581 vld1q_f32(b.as_ptr().add(i + 4)),
2582 );
2583 acc1 = vfmaq_f32(acc1, d1, d1);
2584 i += 8;
2585 }
2586 while i + 4 <= n {
2587 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2588 acc0 = vfmaq_f32(acc0, d, d);
2589 i += 4;
2590 }
2591 vaddvq_f32(vaddq_f32(acc0, acc1))
2592 }
2593}
2594
2595pub fn nsw_query(
2598 table: &Table,
2599 idx_name: &str,
2600 query: &[f32],
2601 k: usize,
2602 metric: NswMetric,
2603) -> Vec<usize> {
2604 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2605 return Vec::new();
2606 };
2607 let ef = (k * 2).max(NSW_DEFAULT_M);
2608 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2609 hits.truncate(k);
2610 hits.into_iter().map(|(_, idx)| idx).collect()
2611}
2612
2613pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2617 table
2618 .indices
2619 .iter()
2620 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2621}
2622
2623#[derive(Debug, Clone, Default)]
2635pub struct Catalog {
2636 tables: Vec<Table>,
2637 by_name: BTreeMap<String, usize>,
2640 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2662}
2663
2664impl Catalog {
2665 pub const fn new() -> Self {
2666 Self {
2667 tables: Vec::new(),
2668 by_name: BTreeMap::new(),
2669 cold_segments: Vec::new(),
2670 }
2671 }
2672
2673 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2674 if self.by_name.contains_key(&schema.name) {
2675 return Err(StorageError::DuplicateTable {
2676 name: schema.name.clone(),
2677 });
2678 }
2679 let idx = self.tables.len();
2680 let name = schema.name.clone();
2681 self.tables.push(Table::new(schema));
2682 self.by_name.insert(name, idx);
2683 Ok(())
2684 }
2685
2686 pub fn get(&self, name: &str) -> Option<&Table> {
2687 let idx = *self.by_name.get(name)?;
2688 self.tables.get(idx)
2689 }
2690
2691 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2692 let idx = *self.by_name.get(name)?;
2693 self.tables.get_mut(idx)
2694 }
2695
2696 pub fn table_count(&self) -> usize {
2697 self.tables.len()
2698 }
2699
2700 pub fn table_names(&self) -> Vec<String> {
2703 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2704 }
2705
2706 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2717 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2718 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2719 })?;
2720 let seg = OwnedSegment::from_bytes(bytes)
2721 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2722 self.cold_segments.push(Some(Arc::new(seg)));
2723 Ok(id)
2724 }
2725
2726 pub fn load_segment_bytes_at(
2739 &mut self,
2740 target_id: u32,
2741 bytes: Vec<u8>,
2742 ) -> Result<(), StorageError> {
2743 let seg = OwnedSegment::from_bytes(bytes)
2744 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2745 let idx = target_id as usize;
2746 while self.cold_segments.len() <= idx {
2747 self.cold_segments.push(None);
2748 }
2749 if self.cold_segments[idx].is_some() {
2750 return Err(StorageError::Corrupt(format!(
2751 "load_segment_bytes_at: segment_id {target_id} already occupied"
2752 )));
2753 }
2754 self.cold_segments[idx] = Some(Arc::new(seg));
2755 Ok(())
2756 }
2757
2758 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2768 let idx = segment_id as usize;
2769 if idx >= self.cold_segments.len() {
2770 return Err(StorageError::Corrupt(format!(
2771 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2772 self.cold_segments.len()
2773 )));
2774 }
2775 self.cold_segments[idx] = None;
2776 Ok(())
2777 }
2778
2779 #[must_use]
2781 pub fn cold_segment_count(&self) -> usize {
2782 self.cold_segments.iter().filter(|s| s.is_some()).count()
2783 }
2784
2785 #[must_use]
2788 pub fn cold_segment_slot_count(&self) -> usize {
2789 self.cold_segments.len()
2790 }
2791
2792 #[must_use]
2797 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2798 self.cold_segments
2799 .iter()
2800 .enumerate()
2801 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2802 .collect()
2803 }
2804
2805 #[must_use]
2812 pub fn hot_tier_bytes(&self) -> u64 {
2813 self.tables
2814 .iter()
2815 .map(Table::hot_bytes)
2816 .fold(0u64, u64::saturating_add)
2817 }
2818
2819 pub fn freeze_oldest_to_cold(
2864 &mut self,
2865 table_name: &str,
2866 index_name: &str,
2867 max_rows: usize,
2868 ) -> Result<FreezeReport, StorageError> {
2869 if max_rows == 0 {
2871 return Err(StorageError::Corrupt(
2872 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2873 ));
2874 }
2875 let table = self.get(table_name).ok_or_else(|| {
2876 StorageError::Corrupt(format!(
2877 "freeze_oldest_to_cold: table {table_name:?} not found"
2878 ))
2879 })?;
2880 if max_rows > table.rows.len() {
2881 return Err(StorageError::Corrupt(format!(
2882 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2883 table.rows.len()
2884 )));
2885 }
2886 let idx = table
2887 .indices
2888 .iter()
2889 .find(|i| i.name == index_name)
2890 .ok_or_else(|| {
2891 StorageError::Corrupt(format!(
2892 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2893 ))
2894 })?;
2895 if !matches!(idx.kind, IndexKind::BTree(_)) {
2896 return Err(StorageError::Corrupt(format!(
2897 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2898 )));
2899 }
2900 let column_position = idx.column_position;
2901
2902 let schema = table.schema.clone();
2904 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2905 for row_idx in 0..max_rows {
2906 let row = table.rows.get(row_idx).expect("bounds-checked above");
2907 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2908 StorageError::Corrupt(format!(
2909 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2910 ))
2911 })?;
2912 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2913 StorageError::Corrupt(format!(
2914 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2915 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2916 ))
2917 })?;
2918 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2919 }
2920 to_freeze.sort_by_key(|(k, _, _)| *k);
2925 for w in to_freeze.windows(2) {
2929 if w[0].0 == w[1].0 {
2930 return Err(StorageError::Corrupt(format!(
2931 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2932 w[0].0
2933 )));
2934 }
2935 }
2936 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2940 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2944 .into_iter()
2945 .map(|(k, body, _)| (k, body))
2946 .collect();
2947 let frozen_rows = seg_rows.len();
2948 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2949 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2950
2951 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2960 let positions: Vec<usize> = (0..max_rows).collect();
2961 let t_mut = self
2962 .get_mut(table_name)
2963 .expect("just validated; still present");
2964 let removed = t_mut.delete_rows(&positions);
2965 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2966 let bytes_after = t_mut.hot_bytes();
2967 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2968
2969 let segment_id = self
2970 .load_segment_bytes(seg_bytes.clone())
2971 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2972 let new_cold = post_swap_keys.into_iter().map(|k| {
2973 (
2974 k,
2975 RowLocator::Cold {
2976 segment_id,
2977 page_offset: 0,
2978 },
2979 )
2980 });
2981 let t_mut = self.get_mut(table_name).expect("still present");
2982 t_mut.register_cold_locators(index_name, new_cold)?;
2983
2984 Ok(FreezeReport {
2985 segment_id,
2986 frozen_rows,
2987 bytes_freed,
2988 segment_bytes: seg_bytes,
2989 })
2990 }
2991
2992 #[must_use]
2998 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
2999 self.cold_segments
3000 .get(segment_id as usize)
3001 .and_then(|s| s.as_deref())
3002 }
3003
3004 pub fn resolve_cold_locator(
3013 &self,
3014 table_name: &str,
3015 segment_id: u32,
3016 key: &IndexKey,
3017 ) -> Option<Row> {
3018 let t = self.get(table_name)?;
3019 let u64_key = index_key_as_u64(key)?;
3020 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3021 let payload = seg.lookup(u64_key)?;
3022 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3023 Some(row)
3024 }
3025
3026 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3044 let t = self.get(table)?;
3045 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3046 let locators = idx.lookup_eq(key);
3047 let cold_u64_key = index_key_as_u64(key);
3048 for loc in locators {
3049 match *loc {
3050 RowLocator::Hot(i) => {
3051 if let Some(row) = t.rows.get(i) {
3052 return Some(row.clone());
3053 }
3054 }
3055 RowLocator::Cold {
3056 segment_id,
3057 page_offset: _,
3058 } => {
3059 let Some(u64_key) = cold_u64_key else {
3060 continue;
3063 };
3064 let Some(seg) = self
3065 .cold_segments
3066 .get(segment_id as usize)
3067 .and_then(|s| s.as_deref())
3068 else {
3069 continue;
3080 };
3081 let Some(payload) = seg.lookup(u64_key) else {
3082 continue;
3083 };
3084 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3085 return Some(row);
3086 }
3087 }
3088 }
3089 None
3090 }
3091
3092 pub fn promote_cold_row(
3114 &mut self,
3115 table_name: &str,
3116 index_name: &str,
3117 key: &IndexKey,
3118 ) -> Result<Option<usize>, StorageError> {
3119 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3120 let Some((segment_id, _page_offset)) = cold_loc else {
3121 return Ok(None);
3122 };
3123 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3124 StorageError::Corrupt(
3125 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3126 .into(),
3127 )
3128 })?;
3129 let schema = self
3133 .get(table_name)
3134 .ok_or_else(|| {
3135 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3136 })?
3137 .schema
3138 .clone();
3139 let seg = self
3140 .cold_segments
3141 .get(segment_id as usize)
3142 .and_then(|s| s.as_ref())
3143 .ok_or_else(|| {
3144 StorageError::Corrupt(format!(
3145 "promote_cold_row: segment {segment_id} not registered on catalog"
3146 ))
3147 })?;
3148 let payload = seg.lookup(u64_key).ok_or_else(|| {
3149 StorageError::Corrupt(format!(
3150 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3151 but the segment's bloom/page lookup didn't return a row"
3152 ))
3153 })?;
3154 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3155 let t = self
3160 .get_mut(table_name)
3161 .expect("table existed at lookup time");
3162 t.insert(row)?;
3163 let new_hot_idx =
3164 t.rows.len().checked_sub(1).ok_or_else(|| {
3165 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3166 })?;
3167 t.remove_cold_locators_for_key(index_name, key)?;
3171 Ok(Some(new_hot_idx))
3172 }
3173
3174 pub fn shadow_cold_row(
3192 &mut self,
3193 table_name: &str,
3194 index_name: &str,
3195 key: &IndexKey,
3196 ) -> Result<usize, StorageError> {
3197 let t = self.get_mut(table_name).ok_or_else(|| {
3198 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3199 })?;
3200 t.remove_cold_locators_for_key(index_name, key)
3201 }
3202
3203 pub fn prepare_freeze_slice(
3221 &self,
3222 table_name: &str,
3223 index_name: &str,
3224 row_range: core::ops::Range<usize>,
3225 ) -> Result<FreezeSlice, StorageError> {
3226 let table = self.get(table_name).ok_or_else(|| {
3227 StorageError::Corrupt(format!(
3228 "prepare_freeze_slice: table {table_name:?} not found"
3229 ))
3230 })?;
3231 let idx = table
3232 .indices
3233 .iter()
3234 .find(|i| i.name == index_name)
3235 .ok_or_else(|| {
3236 StorageError::Corrupt(format!(
3237 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3238 ))
3239 })?;
3240 if !matches!(idx.kind, IndexKind::BTree(_)) {
3241 return Err(StorageError::Corrupt(format!(
3242 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3243 )));
3244 }
3245 if row_range.end > table.rows.len() {
3246 return Err(StorageError::Corrupt(format!(
3247 "prepare_freeze_slice: row_range end {} > row_count {}",
3248 row_range.end,
3249 table.rows.len()
3250 )));
3251 }
3252 let column_position = idx.column_position;
3253 let schema = table.schema.clone();
3254 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3255 for row_idx in row_range.clone() {
3256 let row = table.rows.get(row_idx).expect("bounds-checked above");
3257 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3258 StorageError::Corrupt(format!(
3259 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3260 ))
3261 })?;
3262 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3263 StorageError::Corrupt(format!(
3264 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3265 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3266 ))
3267 })?;
3268 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3269 }
3270 rows.sort_by_key(|(k, _, _)| *k);
3271 Ok(FreezeSlice { row_range, rows })
3272 }
3273
3274 pub fn commit_freeze_slices(
3288 &mut self,
3289 table_name: &str,
3290 index_name: &str,
3291 slices: Vec<FreezeSlice>,
3292 ) -> Result<FreezeReport, StorageError> {
3293 let table = self.get(table_name).ok_or_else(|| {
3295 StorageError::Corrupt(format!(
3296 "commit_freeze_slices: table {table_name:?} not found"
3297 ))
3298 })?;
3299 let idx = table
3300 .indices
3301 .iter()
3302 .find(|i| i.name == index_name)
3303 .ok_or_else(|| {
3304 StorageError::Corrupt(format!(
3305 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3306 ))
3307 })?;
3308 if !matches!(idx.kind, IndexKind::BTree(_)) {
3309 return Err(StorageError::Corrupt(format!(
3310 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3311 )));
3312 }
3313 let mut ordered = slices;
3317 ordered.sort_by_key(|s| s.row_range.start);
3318 let mut expected_start = 0usize;
3322 for s in &ordered {
3323 if s.row_range.start != expected_start {
3324 return Err(StorageError::Corrupt(format!(
3325 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3326 s.row_range.start, expected_start
3327 )));
3328 }
3329 expected_start = s.row_range.end;
3330 }
3331 let max_rows = expected_start;
3332 if max_rows > table.rows.len() {
3333 return Err(StorageError::Corrupt(format!(
3334 "commit_freeze_slices: total row range {} exceeds row_count {}",
3335 max_rows,
3336 table.rows.len()
3337 )));
3338 }
3339 if max_rows == 0 {
3340 return Ok(FreezeReport {
3341 segment_id: u32::MAX,
3342 frozen_rows: 0,
3343 bytes_freed: 0,
3344 segment_bytes: Vec::new(),
3345 });
3346 }
3347
3348 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3353 if total_rows != max_rows {
3354 return Err(StorageError::Corrupt(format!(
3355 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3356 )));
3357 }
3358 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3359 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3360 loop {
3361 let mut pick: Option<usize> = None;
3364 for (i, c) in cursors.iter().enumerate() {
3365 let slice = &ordered[i];
3366 if *c >= slice.rows.len() {
3367 continue;
3368 }
3369 match pick {
3370 None => pick = Some(i),
3371 Some(j) => {
3372 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3373 pick = Some(i);
3374 }
3375 }
3376 }
3377 }
3378 let Some(i) = pick else { break };
3379 let row = ordered[i].rows[cursors[i]].clone();
3380 cursors[i] += 1;
3381 merged.push(row);
3382 }
3383 for w in merged.windows(2) {
3386 if w[0].0 == w[1].0 {
3387 return Err(StorageError::Corrupt(format!(
3388 "commit_freeze_slices: duplicate PK {} across slices",
3389 w[0].0
3390 )));
3391 }
3392 }
3393 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3394 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3395 .into_iter()
3396 .map(|(k, body, _)| (k, body))
3397 .collect();
3398 let frozen_rows = seg_rows.len();
3399 let (seg_bytes, _meta) =
3400 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3401 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3402 })?;
3403
3404 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3406 let positions: Vec<usize> = (0..max_rows).collect();
3407 let t_mut = self
3408 .get_mut(table_name)
3409 .expect("just validated; still present");
3410 let removed = t_mut.delete_rows(&positions);
3411 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3412 let bytes_after = t_mut.hot_bytes();
3413 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3414
3415 let segment_id = self
3416 .load_segment_bytes(seg_bytes.clone())
3417 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3418 let new_cold = post_swap_keys.into_iter().map(|k| {
3419 (
3420 k,
3421 RowLocator::Cold {
3422 segment_id,
3423 page_offset: 0,
3424 },
3425 )
3426 });
3427 let t_mut = self.get_mut(table_name).expect("still present");
3428 t_mut.register_cold_locators(index_name, new_cold)?;
3429
3430 Ok(FreezeReport {
3431 segment_id,
3432 frozen_rows,
3433 bytes_freed,
3434 segment_bytes: seg_bytes,
3435 })
3436 }
3437
3438 pub fn compact_cold_segments(
3481 &mut self,
3482 table_name: &str,
3483 index_name: &str,
3484 target_segment_bytes: u64,
3485 ) -> Result<CompactReport, StorageError> {
3486 let t = self.get(table_name).ok_or_else(|| {
3488 StorageError::Corrupt(format!(
3489 "compact_cold_segments: table {table_name:?} not found"
3490 ))
3491 })?;
3492 let idx = t
3493 .indices
3494 .iter()
3495 .find(|i| i.name == index_name)
3496 .ok_or_else(|| {
3497 StorageError::Corrupt(format!(
3498 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3499 ))
3500 })?;
3501 let map = match &idx.kind {
3502 IndexKind::BTree(m) => m,
3503 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3504 return Err(StorageError::Corrupt(format!(
3505 "compact_cold_segments: index {index_name:?} is not BTree; \
3506 compaction applies only to BTree cold-tier indices"
3507 )));
3508 }
3509 };
3510
3511 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3514 for (_key, locators) in map.iter() {
3515 for loc in locators {
3516 if let RowLocator::Cold { segment_id, .. } = loc {
3517 referenced_ids.insert(*segment_id);
3518 }
3519 }
3520 }
3521 let candidate_set: BTreeSet<u32> = referenced_ids
3523 .into_iter()
3524 .filter(|id| {
3525 self.cold_segments
3526 .get(*id as usize)
3527 .and_then(|s| s.as_deref())
3528 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3529 })
3530 .collect();
3531 if candidate_set.len() < 2 {
3532 return Ok(CompactReport {
3533 sources: Vec::new(),
3534 merged_segment_id: None,
3535 merged_segment_bytes: Vec::new(),
3536 merged_rows: 0,
3537 deleted_rows_pruned: 0,
3538 bytes_reclaimed_estimate: 0,
3539 });
3540 }
3541 let mut source_row_count: usize = 0;
3543 let mut source_byte_total: u64 = 0;
3544 for &id in &candidate_set {
3545 let seg = self.cold_segments[id as usize]
3546 .as_ref()
3547 .expect("candidate selected only when slot is Some");
3548 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3549 source_byte_total =
3550 source_byte_total.saturating_add(seg.bytes().len() as u64);
3551 }
3552 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3558 for (key, locators) in map.iter() {
3559 for loc in locators {
3560 let RowLocator::Cold { segment_id, .. } = loc else {
3561 continue;
3562 };
3563 if !candidate_set.contains(segment_id) {
3564 continue;
3565 }
3566 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3567 StorageError::Corrupt(format!(
3568 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3569 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3570 ))
3571 })?;
3572 let seg = self.cold_segments[*segment_id as usize]
3573 .as_ref()
3574 .expect("candidate slot guaranteed Some above");
3575 let payload = seg.lookup(u64_key).ok_or_else(|| {
3576 StorageError::Corrupt(format!(
3577 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3578 at segment {segment_id} but the segment lookup missed"
3579 ))
3580 })?;
3581 collected.insert(u64_key, (payload, key.clone()));
3582 break;
3583 }
3584 }
3585 let merged_rows = collected.len();
3586 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3587
3588 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3592 .iter()
3593 .map(|(k, (body, _))| (*k, body.clone()))
3594 .collect();
3595 let (seg_bytes, _meta) =
3596 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3597 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3598 })?;
3599 let merged_bytes_len = seg_bytes.len() as u64;
3600
3601 let merged_segment_id = self
3603 .load_segment_bytes(seg_bytes.clone())
3604 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3605
3606 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3612 let t = self
3613 .get(table_name)
3614 .expect("table existed at the start of this fn");
3615 let idx = t
3616 .indices
3617 .iter()
3618 .find(|i| i.name == index_name)
3619 .expect("index existed at the start of this fn");
3620 let IndexKind::BTree(map) = &idx.kind else {
3621 unreachable!("validated above");
3622 };
3623 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3624 };
3625 let t_mut = self
3626 .get_mut(table_name)
3627 .expect("table existed at the start of this fn");
3628 let idx_mut = t_mut
3629 .indices
3630 .iter_mut()
3631 .find(|i| i.name == index_name)
3632 .expect("index existed at the start of this fn");
3633 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3634 unreachable!("validated above");
3635 };
3636 for (key, locators) in entries {
3637 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3638 let mut changed = false;
3639 for loc in &locators {
3640 match *loc {
3641 RowLocator::Cold {
3642 segment_id,
3643 page_offset: _,
3644 } if candidate_set.contains(&segment_id) => {
3645 let replacement = RowLocator::Cold {
3646 segment_id: merged_segment_id,
3647 page_offset: 0,
3648 };
3649 if !new_locs.contains(&replacement) {
3650 new_locs.push(replacement);
3651 }
3652 changed = true;
3653 }
3654 other => new_locs.push(other),
3655 }
3656 }
3657 if changed {
3658 map_mut.insert_mut(key, new_locs);
3659 }
3660 }
3661
3662 for &id in &candidate_set {
3667 self.tombstone_segment(id)?;
3668 }
3669
3670 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3671 Ok(CompactReport {
3672 sources: candidate_set.into_iter().collect(),
3673 merged_segment_id: Some(merged_segment_id),
3674 merged_segment_bytes: seg_bytes,
3675 merged_rows,
3676 deleted_rows_pruned,
3677 bytes_reclaimed_estimate,
3678 })
3679 }
3680
3681 fn find_cold_locator(
3687 &self,
3688 table_name: &str,
3689 index_name: &str,
3690 key: &IndexKey,
3691 ) -> Result<Option<(u32, u32)>, StorageError> {
3692 let t = self.get(table_name).ok_or_else(|| {
3693 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3694 })?;
3695 let idx = t
3696 .indices
3697 .iter()
3698 .find(|i| i.name == index_name)
3699 .ok_or_else(|| {
3700 StorageError::Corrupt(format!(
3701 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3702 ))
3703 })?;
3704 if !matches!(idx.kind, IndexKind::BTree(_)) {
3705 return Err(StorageError::Corrupt(format!(
3706 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3707 )));
3708 }
3709 for loc in idx.lookup_eq(key) {
3710 if let RowLocator::Cold {
3711 segment_id,
3712 page_offset,
3713 } = *loc
3714 {
3715 return Ok(Some((segment_id, page_offset)));
3716 }
3717 }
3718 Ok(None)
3719 }
3720}
3721
3722fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3728 match key {
3729 IndexKey::Int(n) => Some(n.cast_unsigned()),
3735 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3736 }
3737}
3738
3739#[derive(Debug, Clone, PartialEq, Eq)]
3740#[non_exhaustive]
3741pub enum StorageError {
3742 DuplicateTable {
3743 name: String,
3744 },
3745 TableNotFound {
3746 name: String,
3747 },
3748 ArityMismatch {
3749 expected: usize,
3750 actual: usize,
3751 },
3752 TypeMismatch {
3753 column: String,
3754 expected: DataType,
3755 actual: DataType,
3756 position: usize,
3757 },
3758 NullInNotNull {
3759 column: String,
3760 },
3761 DuplicateIndex {
3763 name: String,
3764 },
3765 ColumnNotFound {
3767 column: String,
3768 },
3769 Corrupt(String),
3772 IndexNotFound {
3775 name: String,
3776 },
3777 Unsupported(String),
3781}
3782
3783impl fmt::Display for StorageError {
3784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3785 match self {
3786 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3787 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3788 Self::ArityMismatch { expected, actual } => write!(
3789 f,
3790 "row arity mismatch: expected {expected} columns, got {actual}"
3791 ),
3792 Self::TypeMismatch {
3793 column,
3794 expected,
3795 actual,
3796 position,
3797 } => write!(
3798 f,
3799 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3800 ),
3801 Self::NullInNotNull { column } => {
3802 write!(f, "NULL value in NOT NULL column {column:?}")
3803 }
3804 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3805 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3806 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3807 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3808 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3809 }
3810 }
3811}
3812
3813impl ColumnSchema {
3814 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3815 Self {
3816 name: name.into(),
3817 ty,
3818 nullable,
3819 default: None,
3820 runtime_default: None,
3821 auto_increment: false,
3822 }
3823 }
3824
3825 #[must_use]
3829 pub fn with_default(mut self, default: Value) -> Self {
3830 self.default = Some(default);
3831 self
3832 }
3833
3834 #[must_use]
3839 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3840 self.runtime_default = Some(expr.into());
3841 self
3842 }
3843
3844 #[must_use]
3846 pub const fn with_auto_increment(mut self) -> Self {
3847 self.auto_increment = true;
3848 self
3849 }
3850}
3851
3852impl TableSchema {
3853 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3854 Self {
3855 name: name.into(),
3856 columns,
3857 hot_tier_bytes: None,
3858 foreign_keys: Vec::new(),
3859 uniqueness_constraints: Vec::new(),
3860 }
3861 }
3862}
3863
3864const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3912const FILE_VERSION: u8 = 17;
3938const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3941
3942const INDEX_KEY_TAG_INT: u8 = 0;
3947const INDEX_KEY_TAG_TEXT: u8 = 1;
3948const INDEX_KEY_TAG_BOOL: u8 = 2;
3949
3950impl Catalog {
3951 pub fn serialize(&self) -> Vec<u8> {
3954 let mut out = Vec::with_capacity(64);
3955 out.extend_from_slice(FILE_MAGIC);
3956 out.push(FILE_VERSION);
3957 write_u32(
3958 &mut out,
3959 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3960 );
3961 for t in &self.tables {
3962 write_str(&mut out, &t.schema.name);
3963 write_u16(
3964 &mut out,
3965 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3966 );
3967 for c in &t.schema.columns {
3968 write_str(&mut out, &c.name);
3969 write_data_type(&mut out, c.ty);
3970 out.push(u8::from(c.nullable));
3971 match &c.default {
3972 None => out.push(0),
3973 Some(v) => {
3974 out.push(1);
3975 write_value(&mut out, v);
3976 }
3977 }
3978 out.push(u8::from(c.auto_increment));
3979 }
3980 write_u32(
3981 &mut out,
3982 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
3983 );
3984 for row in &t.rows {
3989 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
3990 }
3991 write_u16(
3998 &mut out,
3999 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4000 );
4001 for idx in &t.indices {
4002 write_str(&mut out, &idx.name);
4003 write_u16(
4004 &mut out,
4005 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4006 );
4007 match &idx.kind {
4008 IndexKind::BTree(map) => {
4009 out.push(0);
4010 write_u32(
4018 &mut out,
4019 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4020 );
4021 for (key, locators) in map {
4022 write_index_key(&mut out, key);
4023 write_u32(
4024 &mut out,
4025 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4026 );
4027 for loc in locators {
4028 loc.write_le(&mut out);
4029 }
4030 }
4031 }
4032 IndexKind::Nsw(g) => {
4033 out.push(1);
4034 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4035 write_nsw_graph(&mut out, g);
4036 }
4037 IndexKind::Brin { column_type } => {
4038 out.push(2);
4044 write_data_type(&mut out, *column_type);
4045 }
4046 }
4047 write_u16(
4053 &mut out,
4054 u16::try_from(idx.included_columns.len())
4055 .expect("≤ 65k INCLUDE columns/index"),
4056 );
4057 for col_pos in &idx.included_columns {
4058 write_u16(
4059 &mut out,
4060 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4061 );
4062 }
4063 match &idx.partial_predicate {
4067 None => out.push(0),
4068 Some(pred) => {
4069 out.push(1);
4070 write_str(&mut out, pred);
4071 }
4072 }
4073 match &idx.expression {
4076 None => out.push(0),
4077 Some(expr) => {
4078 out.push(1);
4079 write_str(&mut out, expr);
4080 }
4081 }
4082 out.push(u8::from(idx.is_unique));
4086 write_u16(
4089 &mut out,
4090 u16::try_from(idx.extra_column_positions.len())
4091 .expect("≤ 65k extra cols / index"),
4092 );
4093 for cp in &idx.extra_column_positions {
4094 write_u16(
4095 &mut out,
4096 u16::try_from(*cp).expect("≤ 65k columns/table"),
4097 );
4098 }
4099 }
4100 match t.schema.hot_tier_bytes {
4106 None => out.push(0),
4107 Some(n) => {
4108 out.push(1);
4109 out.extend_from_slice(&n.to_le_bytes());
4110 }
4111 }
4112 write_u16(
4123 &mut out,
4124 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4125 );
4126 for fk in &t.schema.foreign_keys {
4127 match &fk.name {
4128 None => out.push(0),
4129 Some(n) => {
4130 out.push(1);
4131 write_str(&mut out, n);
4132 }
4133 }
4134 write_u16(
4135 &mut out,
4136 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4137 );
4138 for &p in &fk.local_columns {
4139 write_u16(
4140 &mut out,
4141 u16::try_from(p).expect("≤ 65k columns/table"),
4142 );
4143 }
4144 write_str(&mut out, &fk.parent_table);
4145 write_u16(
4146 &mut out,
4147 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4148 );
4149 for &p in &fk.parent_columns {
4150 write_u16(
4151 &mut out,
4152 u16::try_from(p).expect("≤ 65k columns/table"),
4153 );
4154 }
4155 out.push(fk.on_delete.tag());
4156 out.push(fk.on_update.tag());
4157 }
4158 write_u16(
4167 &mut out,
4168 u16::try_from(t.schema.uniqueness_constraints.len())
4169 .expect("≤ 65k uniqueness constraints/table"),
4170 );
4171 for uc in &t.schema.uniqueness_constraints {
4172 out.push(u8::from(uc.is_primary_key));
4173 write_u16(
4174 &mut out,
4175 u16::try_from(uc.columns.len())
4176 .expect("≤ 65k cols in uniqueness constraint"),
4177 );
4178 for &p in &uc.columns {
4179 write_u16(
4180 &mut out,
4181 u16::try_from(p).expect("≤ 65k columns/table"),
4182 );
4183 }
4184 }
4185 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4192 for (i, c) in t.schema.columns.iter().enumerate() {
4193 if let Some(e) = &c.runtime_default {
4194 rt_defaults.push((i, e.as_str()));
4195 }
4196 }
4197 write_u16(
4198 &mut out,
4199 u16::try_from(rt_defaults.len())
4200 .expect("≤ 65k runtime defaults/table"),
4201 );
4202 for (pos, expr) in rt_defaults {
4203 write_u16(
4204 &mut out,
4205 u16::try_from(pos).expect("≤ 65k columns/table"),
4206 );
4207 write_str(&mut out, expr);
4208 }
4209 }
4210 out
4211 }
4212
4213 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4216 let mut cur = Cursor::new(buf);
4217 let magic = cur.take(8)?;
4218 if magic != FILE_MAGIC {
4219 return Err(StorageError::Corrupt(format!(
4220 "bad magic: expected SPGDB001, got {magic:?}"
4221 )));
4222 }
4223 let version = cur.read_u8()?;
4224 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4225 return Err(StorageError::Corrupt(format!(
4226 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4227 )));
4228 }
4229 let table_count = cur.read_u32()? as usize;
4230 let mut cat = Self::new();
4231 for _ in 0..table_count {
4232 deserialize_table(&mut cur, &mut cat, version)?;
4233 }
4234 if cur.pos < buf.len() {
4235 return Err(StorageError::Corrupt(format!(
4236 "trailing bytes: {} unread",
4237 buf.len() - cur.pos
4238 )));
4239 }
4240 Ok(cat)
4241 }
4242}
4243
4244fn deserialize_table(
4249 cur: &mut Cursor<'_>,
4250 cat: &mut Catalog,
4251 version: u8,
4252) -> Result<(), StorageError> {
4253 let table_name = cur.read_str()?;
4254 let name = table_name.clone();
4255 let col_count = cur.read_u16()? as usize;
4256 let mut cols = Vec::with_capacity(col_count);
4257 for _ in 0..col_count {
4258 let c_name = cur.read_str()?;
4259 let ty = cur.read_data_type()?;
4260 let nullable = cur.read_u8()? != 0;
4261 let default = match cur.read_u8()? {
4262 0 => None,
4263 1 => Some(cur.read_value()?),
4264 other => {
4265 return Err(StorageError::Corrupt(format!(
4266 "unknown default tag: {other}"
4267 )));
4268 }
4269 };
4270 let auto_increment = cur.read_u8()? != 0;
4271 cols.push(ColumnSchema {
4275 name: c_name,
4276 ty,
4277 nullable,
4278 default,
4279 runtime_default: None,
4280 auto_increment,
4281 });
4282 }
4283 let n_cols = cols.len();
4284 cat.create_table(TableSchema::new(name, cols))?;
4285 let t = cat.tables.last_mut().expect("create_table just pushed");
4289 deserialize_rows(cur, t, n_cols)?;
4290 deserialize_indices(cur, t, version)?;
4291 if version >= 11 {
4297 let has = cur.read_u8()?;
4298 let hot_tier_bytes = match has {
4299 0 => None,
4300 1 => Some(cur.read_u64()?),
4301 other => {
4302 return Err(StorageError::Corrupt(format!(
4303 "hot_tier_bytes appendix: unknown has-value byte {other}"
4304 )));
4305 }
4306 };
4307 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4308 }
4309 if version >= 13 {
4312 let fk_count = cur.read_u16()? as usize;
4313 let mut fks = Vec::with_capacity(fk_count);
4314 for _ in 0..fk_count {
4315 let name = match cur.read_u8()? {
4316 0 => None,
4317 1 => Some(cur.read_str()?),
4318 other => {
4319 return Err(StorageError::Corrupt(format!(
4320 "FK appendix: unknown has-name byte {other}"
4321 )));
4322 }
4323 };
4324 let local_arity = cur.read_u16()? as usize;
4325 let mut local_columns = Vec::with_capacity(local_arity);
4326 for _ in 0..local_arity {
4327 local_columns.push(cur.read_u16()? as usize);
4328 }
4329 let parent_table = cur.read_str()?;
4330 let parent_arity = cur.read_u16()? as usize;
4331 if parent_arity != local_arity {
4332 return Err(StorageError::Corrupt(format!(
4333 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4334 )));
4335 }
4336 let mut parent_columns = Vec::with_capacity(parent_arity);
4337 for _ in 0..parent_arity {
4338 parent_columns.push(cur.read_u16()? as usize);
4339 }
4340 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4341 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4342 })?;
4343 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4344 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4345 })?;
4346 fks.push(ForeignKeyConstraint {
4347 name,
4348 local_columns,
4349 parent_table,
4350 parent_columns,
4351 on_delete,
4352 on_update,
4353 });
4354 }
4355 t.schema_mut().foreign_keys = fks;
4356 }
4357 if version >= 15 {
4360 let uc_count = cur.read_u16()? as usize;
4361 let mut ucs = Vec::with_capacity(uc_count);
4362 for _ in 0..uc_count {
4363 let is_pk = cur.read_u8()? != 0;
4364 let arity = cur.read_u16()? as usize;
4365 let mut cols = Vec::with_capacity(arity);
4366 for _ in 0..arity {
4367 cols.push(cur.read_u16()? as usize);
4368 }
4369 ucs.push(UniquenessConstraint {
4370 is_primary_key: is_pk,
4371 columns: cols,
4372 });
4373 }
4374 t.schema_mut().uniqueness_constraints = ucs;
4375 let rt_count = cur.read_u16()? as usize;
4377 for _ in 0..rt_count {
4378 let pos = cur.read_u16()? as usize;
4379 let expr = cur.read_str()?;
4380 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4381 col.runtime_default = Some(expr);
4382 }
4383 }
4384 }
4385 let _ = table_name;
4386 Ok(())
4387}
4388
4389fn deserialize_rows(
4390 cur: &mut Cursor<'_>,
4391 t: &mut Table,
4392 _n_cols: usize,
4393) -> Result<(), StorageError> {
4394 let row_count = cur.read_u32()? as usize;
4395 let mut hot_bytes: u64 = 0;
4400 for _ in 0..row_count {
4401 let tail = &cur.buf[cur.pos..];
4402 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4403 cur.pos += consumed;
4404 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4410 t.rows.push_mut(row);
4411 }
4412 t.hot_bytes = hot_bytes;
4413 Ok(())
4414}
4415
4416fn deserialize_indices(
4417 cur: &mut Cursor<'_>,
4418 t: &mut Table,
4419 version: u8,
4420) -> Result<(), StorageError> {
4421 let index_count = cur.read_u16()? as usize;
4422 for _ in 0..index_count {
4423 let idx_name = cur.read_str()?;
4424 let col_pos = cur.read_u16()? as usize;
4425 let column_name = t
4426 .schema
4427 .columns
4428 .get(col_pos)
4429 .ok_or_else(|| {
4430 StorageError::Corrupt(format!(
4431 "index {idx_name:?} points at non-existent column position {col_pos}"
4432 ))
4433 })?
4434 .name
4435 .clone();
4436 let kind_tag = cur.read_u8()?;
4437 match kind_tag {
4438 0 => {
4439 if version >= 9 {
4440 let map = read_btree_map(cur)?;
4445 t.restore_btree_index(idx_name, &column_name, map)?;
4446 } else {
4447 t.add_index(idx_name, &column_name)?;
4452 }
4453 }
4454 1 => {
4455 let m = cur.read_u16()? as usize;
4456 let graph = cur.read_nsw_graph(m)?;
4457 t.restore_nsw_index(idx_name, &column_name, graph)?;
4458 }
4459 2 => {
4460 let column_type = cur.read_data_type()?;
4464 t.restore_brin_index(idx_name, &column_name, column_type)?;
4465 }
4466 other => {
4467 return Err(StorageError::Corrupt(format!(
4468 "unknown index kind tag: {other}"
4469 )));
4470 }
4471 }
4472 if version >= 12 {
4475 let num_included = cur.read_u16()? as usize;
4476 if num_included > 0 {
4477 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4478 for _ in 0..num_included {
4479 let cp = cur.read_u16()? as usize;
4480 if cp >= t.schema.columns.len() {
4481 return Err(StorageError::Corrupt(format!(
4482 "INCLUDE column position {cp} out of range \
4483 ({} schema columns)",
4484 t.schema.columns.len()
4485 )));
4486 }
4487 included.push(cp);
4488 }
4489 if let Some(last) = t.indices.last_mut() {
4490 last.included_columns = included;
4491 }
4492 }
4493 match cur.read_u8()? {
4495 0 => {}
4496 1 => {
4497 let pred = cur.read_str()?;
4498 if let Some(last) = t.indices.last_mut() {
4499 last.partial_predicate = Some(pred);
4500 }
4501 }
4502 other => {
4503 return Err(StorageError::Corrupt(format!(
4504 "partial_predicate tag: unknown byte {other}"
4505 )));
4506 }
4507 }
4508 match cur.read_u8()? {
4510 0 => {}
4511 1 => {
4512 let expr = cur.read_str()?;
4513 if let Some(last) = t.indices.last_mut() {
4514 last.expression = Some(expr);
4515 }
4516 }
4517 other => {
4518 return Err(StorageError::Corrupt(format!(
4519 "expression tag: unknown byte {other}"
4520 )));
4521 }
4522 }
4523 if version >= 16 {
4526 match cur.read_u8()? {
4527 0 => {}
4528 1 => {
4529 if let Some(last) = t.indices.last_mut() {
4530 last.is_unique = true;
4531 }
4532 }
4533 other => {
4534 return Err(StorageError::Corrupt(format!(
4535 "is_unique tag: unknown byte {other}"
4536 )));
4537 }
4538 }
4539 let n = cur.read_u16()? as usize;
4541 if n > 0 {
4542 let mut extras: Vec<usize> = Vec::with_capacity(n);
4543 for _ in 0..n {
4544 let cp = cur.read_u16()? as usize;
4545 if cp >= t.schema.columns.len() {
4546 return Err(StorageError::Corrupt(format!(
4547 "extra column position {cp} out of range \
4548 ({} schema columns)",
4549 t.schema.columns.len()
4550 )));
4551 }
4552 extras.push(cp);
4553 }
4554 if let Some(last) = t.indices.last_mut() {
4555 last.extra_column_positions = extras;
4556 }
4557 }
4558 }
4559 }
4560 }
4561 Ok(())
4562}
4563
4564fn read_btree_map(
4568 cur: &mut Cursor<'_>,
4569) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4570 let entry_count = cur.read_u32()? as usize;
4571 let mut map = PersistentBTreeMap::new();
4572 for _ in 0..entry_count {
4573 let key = cur.read_index_key()?;
4574 let locator_count = cur.read_u32()? as usize;
4575 let mut locators = Vec::with_capacity(locator_count);
4576 for _ in 0..locator_count {
4577 let tail = &cur.buf[cur.pos..];
4578 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4579 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4580 })?;
4581 cur.pos += consumed;
4582 locators.push(loc);
4583 }
4584 map.insert_mut(key, locators);
4585 }
4586 Ok(map)
4587}
4588
4589fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4605 let entry = g.entry.map_or(u32::MAX, |e| {
4606 u32::try_from(e).expect("NSW entry fits in u32")
4607 });
4608 write_u16(
4609 out,
4610 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4611 );
4612 out.extend_from_slice(&entry.to_le_bytes());
4613 out.push(g.entry_level);
4614 let node_count = g.levels.len();
4615 write_u32(
4616 out,
4617 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4618 );
4619 for &lvl in &g.levels {
4620 out.push(lvl);
4621 }
4622 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4623 out.push(layer_count);
4624 for layer in &g.layers {
4625 write_u32(
4626 out,
4627 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4628 );
4629 for neighbors in layer {
4630 write_u16(
4631 out,
4632 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4633 );
4634 for &peer in neighbors {
4638 write_u32(out, peer);
4639 }
4640 }
4641 }
4642}
4643
4644fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4645 match t {
4646 DataType::Int => out.push(1),
4647 DataType::BigInt => out.push(2),
4648 DataType::Float => out.push(3),
4649 DataType::Text => out.push(4),
4650 DataType::Bool => out.push(5),
4651 DataType::Vector { dim, encoding } => match encoding {
4652 VecEncoding::F32 => {
4656 out.push(6);
4657 out.extend_from_slice(&dim.to_le_bytes());
4658 }
4659 VecEncoding::F16 => {
4662 out.push(15);
4663 out.extend_from_slice(&dim.to_le_bytes());
4664 }
4665 VecEncoding::Sq8 => {
4671 out.push(14);
4672 out.extend_from_slice(&dim.to_le_bytes());
4673 }
4674 },
4675 DataType::SmallInt => out.push(7),
4676 DataType::Varchar(max) => {
4677 out.push(8);
4678 out.extend_from_slice(&max.to_le_bytes());
4679 }
4680 DataType::Char(size) => {
4681 out.push(9);
4682 out.extend_from_slice(&size.to_le_bytes());
4683 }
4684 DataType::Numeric { precision, scale } => {
4685 out.push(10);
4686 out.push(precision);
4687 out.push(scale);
4688 }
4689 DataType::Date => out.push(11),
4690 DataType::Timestamp => out.push(12),
4691 DataType::Timestamptz => out.push(17),
4695 DataType::Interval => {
4700 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4701 }
4702 DataType::Json => out.push(13),
4703 DataType::Jsonb => out.push(16),
4706 DataType::Bytes => out.push(18),
4708 }
4709}
4710
4711impl Cursor<'_> {
4712 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4713 let tag = self.read_u8()?;
4714 match tag {
4715 1 => Ok(DataType::Int),
4716 2 => Ok(DataType::BigInt),
4717 3 => Ok(DataType::Float),
4718 4 => Ok(DataType::Text),
4719 5 => Ok(DataType::Bool),
4720 6 => Ok(DataType::Vector {
4721 dim: self.read_u32()?,
4722 encoding: VecEncoding::F32,
4723 }),
4724 7 => Ok(DataType::SmallInt),
4725 8 => Ok(DataType::Varchar(self.read_u32()?)),
4726 9 => Ok(DataType::Char(self.read_u32()?)),
4727 10 => {
4728 let precision = self.read_u8()?;
4729 let scale = self.read_u8()?;
4730 Ok(DataType::Numeric { precision, scale })
4731 }
4732 11 => Ok(DataType::Date),
4733 12 => Ok(DataType::Timestamp),
4734 13 => Ok(DataType::Json),
4735 14 => Ok(DataType::Vector {
4736 dim: self.read_u32()?,
4737 encoding: VecEncoding::Sq8,
4738 }),
4739 15 => Ok(DataType::Vector {
4743 dim: self.read_u32()?,
4744 encoding: VecEncoding::F16,
4745 }),
4746 16 => Ok(DataType::Jsonb),
4750 17 => Ok(DataType::Timestamptz),
4754 18 => Ok(DataType::Bytes),
4756 other => Err(StorageError::Corrupt(format!(
4757 "unknown data type tag: {other}"
4758 ))),
4759 }
4760 }
4761}
4762
4763pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4769 debug_assert_eq!(
4770 row.values.len(),
4771 schema.columns.len(),
4772 "row_body_encoded_len: row arity must match schema"
4773 );
4774 let bitmap_bytes = schema.columns.len().div_ceil(8);
4775 let mut n = bitmap_bytes;
4776 for (col_idx, v) in row.values.iter().enumerate() {
4777 if matches!(v, Value::Null) {
4778 continue;
4779 }
4780 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4781 }
4782 n
4783}
4784
4785fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4791 match v {
4792 Value::SmallInt(_) => 2,
4793 Value::Int(_) | Value::Date(_) => 4,
4795 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4797 Value::Bool(_) => 1,
4798 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4800 Value::Vector(vec) => 4 + 4 * vec.len(),
4802 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4809 Value::HalfVector(h) => 4 + h.bytes.len(),
4812 Value::Numeric { .. } => 16 + 1,
4814 Value::Bytes(b) => 2 + b.len(),
4820 Value::Null => 0,
4822 Value::Interval { .. } => {
4824 unreachable!("Value::Interval has no on-disk encoding")
4825 }
4826 }
4827}
4828
4829pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4840 debug_assert_eq!(
4841 row.values.len(),
4842 schema.columns.len(),
4843 "dense encode: row arity must match schema"
4844 );
4845 let bitmap_bytes = schema.columns.len().div_ceil(8);
4846 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4849 let bitmap_offset = out.len();
4850 out.resize(bitmap_offset + bitmap_bytes, 0);
4851 for (i, v) in row.values.iter().enumerate() {
4852 if matches!(v, Value::Null) {
4853 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4854 }
4855 }
4856 for (col_idx, v) in row.values.iter().enumerate() {
4857 if matches!(v, Value::Null) {
4858 continue;
4859 }
4860 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4861 }
4862 out
4863}
4864
4865pub fn decode_row_body_dense(
4871 bytes: &[u8],
4872 schema: &TableSchema,
4873) -> Result<(Row, usize), StorageError> {
4874 let mut cur = Cursor::new(bytes);
4875 let bitmap_bytes = schema.columns.len().div_ceil(8);
4876 let mut bitmap_buf = [0u8; 32];
4877 if bitmap_bytes > bitmap_buf.len() {
4878 return Err(StorageError::Corrupt(format!(
4879 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4880 )));
4881 }
4882 let slice = cur.take(bitmap_bytes)?;
4883 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4884 let mut values = Vec::with_capacity(schema.columns.len());
4885 for (col_idx, col) in schema.columns.iter().enumerate() {
4886 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4887 values.push(Value::Null);
4888 } else {
4889 values.push(cur.read_value_body(col.ty)?);
4890 }
4891 }
4892 Ok((Row { values }, cur.pos))
4893}
4894
4895fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4904 match (v, ty) {
4905 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4906 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4907 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4908 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4909 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4910 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4911 write_str(out, s);
4912 }
4913 (
4914 Value::Vector(v),
4915 DataType::Vector {
4916 encoding: VecEncoding::F32,
4917 ..
4918 },
4919 ) => {
4920 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4921 out.extend_from_slice(&dim.to_le_bytes());
4922 for x in v {
4923 out.extend_from_slice(&x.to_le_bytes());
4924 }
4925 }
4926 (
4932 Value::Sq8Vector(q),
4933 DataType::Vector {
4934 encoding: VecEncoding::Sq8,
4935 ..
4936 },
4937 ) => {
4938 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4939 out.extend_from_slice(&dim.to_le_bytes());
4940 out.extend_from_slice(&q.min.to_le_bytes());
4941 out.extend_from_slice(&q.max.to_le_bytes());
4942 out.extend_from_slice(&q.bytes);
4943 }
4944 (
4948 Value::HalfVector(h),
4949 DataType::Vector {
4950 encoding: VecEncoding::F16,
4951 ..
4952 },
4953 ) => {
4954 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4955 out.extend_from_slice(&dim.to_le_bytes());
4956 out.extend_from_slice(&h.bytes);
4957 }
4958 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4959 out.extend_from_slice(&scaled.to_le_bytes());
4960 out.push(scale);
4961 }
4962 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4963 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
4964 out.extend_from_slice(&t.to_le_bytes())
4965 }
4966 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
4970 (Value::Bytes(b), DataType::Bytes) => {
4973 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
4974 out.extend_from_slice(&len.to_le_bytes());
4975 out.extend_from_slice(b);
4976 }
4977 (other, ty) => unreachable!(
4981 "schema-driven encode received mismatched value/type pair: \
4982 value tag={:?}, column type={:?}",
4983 other.data_type(),
4984 ty
4985 ),
4986 }
4987}
4988
4989fn write_value(out: &mut Vec<u8>, v: &Value) {
4990 match v {
4991 Value::Null => out.push(0),
4992 Value::SmallInt(n) => {
4993 out.push(7);
4994 out.extend_from_slice(&n.to_le_bytes());
4995 }
4996 Value::Int(n) => {
4997 out.push(1);
4998 out.extend_from_slice(&n.to_le_bytes());
4999 }
5000 Value::BigInt(n) => {
5001 out.push(2);
5002 out.extend_from_slice(&n.to_le_bytes());
5003 }
5004 Value::Float(x) => {
5005 out.push(3);
5006 out.extend_from_slice(&x.to_le_bytes());
5007 }
5008 Value::Text(s) | Value::Json(s) => {
5013 out.push(4);
5014 write_str(out, s);
5015 }
5016 Value::Bool(b) => {
5017 out.push(5);
5018 out.push(u8::from(*b));
5019 }
5020 Value::Vector(v) => {
5021 out.push(6);
5022 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5023 out.extend_from_slice(&dim.to_le_bytes());
5024 for x in v {
5025 out.extend_from_slice(&x.to_le_bytes());
5026 }
5027 }
5028 Value::Sq8Vector(q) => {
5033 out.push(11);
5034 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5035 out.extend_from_slice(&dim.to_le_bytes());
5036 out.extend_from_slice(&q.min.to_le_bytes());
5037 out.extend_from_slice(&q.max.to_le_bytes());
5038 out.extend_from_slice(&q.bytes);
5039 }
5040 Value::HalfVector(h) => {
5045 out.push(12);
5046 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5047 out.extend_from_slice(&dim.to_le_bytes());
5048 out.extend_from_slice(&h.bytes);
5049 }
5050 Value::Numeric { scaled, scale } => {
5051 out.push(8);
5052 out.extend_from_slice(&scaled.to_le_bytes());
5053 out.push(*scale);
5054 }
5055 Value::Date(d) => {
5056 out.push(9);
5057 out.extend_from_slice(&d.to_le_bytes());
5058 }
5059 Value::Timestamp(t) => {
5060 out.push(10);
5061 out.extend_from_slice(&t.to_le_bytes());
5062 }
5063 Value::Interval { .. } => {
5067 unreachable!(
5068 "Value::Interval has no on-disk encoding; engine must reject it before write"
5069 )
5070 }
5071 Value::Bytes(b) => {
5076 out.push(14);
5077 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5078 out.extend_from_slice(&len.to_le_bytes());
5079 out.extend_from_slice(b);
5080 }
5081 }
5082}
5083
5084fn write_u16(out: &mut Vec<u8>, n: u16) {
5085 out.extend_from_slice(&n.to_le_bytes());
5086}
5087fn write_u32(out: &mut Vec<u8>, n: u32) {
5088 out.extend_from_slice(&n.to_le_bytes());
5089}
5090fn write_str(out: &mut Vec<u8>, s: &str) {
5091 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
5092 write_u16(out, len);
5093 out.extend_from_slice(s.as_bytes());
5094}
5095
5096fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
5100 match key {
5101 IndexKey::Int(n) => {
5102 out.push(INDEX_KEY_TAG_INT);
5103 out.extend_from_slice(&n.to_le_bytes());
5104 }
5105 IndexKey::Text(s) => {
5106 out.push(INDEX_KEY_TAG_TEXT);
5107 write_str(out, s);
5108 }
5109 IndexKey::Bool(b) => {
5110 out.push(INDEX_KEY_TAG_BOOL);
5111 out.push(u8::from(*b));
5112 }
5113 }
5114}
5115
5116struct Cursor<'a> {
5117 buf: &'a [u8],
5118 pos: usize,
5119}
5120
5121impl<'a> Cursor<'a> {
5122 const fn new(buf: &'a [u8]) -> Self {
5123 Self { buf, pos: 0 }
5124 }
5125
5126 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5127 let end = self
5128 .pos
5129 .checked_add(n)
5130 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5131 if end > self.buf.len() {
5132 return Err(StorageError::Corrupt(format!(
5133 "unexpected EOF at offset {} (wanted {n} more bytes)",
5134 self.pos
5135 )));
5136 }
5137 let s = &self.buf[self.pos..end];
5138 self.pos = end;
5139 Ok(s)
5140 }
5141
5142 fn read_u8(&mut self) -> Result<u8, StorageError> {
5143 Ok(self.take(1)?[0])
5144 }
5145 fn read_u16(&mut self) -> Result<u16, StorageError> {
5146 let s = self.take(2)?;
5147 Ok(u16::from_le_bytes([s[0], s[1]]))
5148 }
5149 fn read_u32(&mut self) -> Result<u32, StorageError> {
5150 let s = self.take(4)?;
5151 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5152 }
5153 fn read_i32(&mut self) -> Result<i32, StorageError> {
5154 let s = self.take(4)?;
5155 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5156 }
5157 fn read_u64(&mut self) -> Result<u64, StorageError> {
5160 let s = self.take(8)?;
5161 Ok(u64::from_le_bytes([
5162 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
5163 ]))
5164 }
5165 fn read_i64(&mut self) -> Result<i64, StorageError> {
5166 let s = self.take(8)?;
5167 let arr: [u8; 8] = s.try_into().expect("checked");
5168 Ok(i64::from_le_bytes(arr))
5169 }
5170 fn read_f64(&mut self) -> Result<f64, StorageError> {
5171 let s = self.take(8)?;
5172 let arr: [u8; 8] = s.try_into().expect("checked");
5173 Ok(f64::from_le_bytes(arr))
5174 }
5175 fn read_f32(&mut self) -> Result<f32, StorageError> {
5176 let s = self.take(4)?;
5177 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5178 }
5179 fn read_str(&mut self) -> Result<String, StorageError> {
5180 let len = self.read_u16()? as usize;
5181 let bytes = self.take(len)?;
5182 core::str::from_utf8(bytes)
5183 .map(String::from)
5184 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
5185 }
5186
5187 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
5191 let tag = self.read_u8()?;
5192 match tag {
5193 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
5194 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
5195 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
5196 other => Err(StorageError::Corrupt(format!(
5197 "unknown index key tag: {other}"
5198 ))),
5199 }
5200 }
5201 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
5207 match ty {
5208 DataType::SmallInt => {
5209 let s = self.take(2)?;
5210 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5211 }
5212 DataType::Int => Ok(Value::Int(self.read_i32()?)),
5213 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
5214 DataType::Float => Ok(Value::Float(self.read_f64()?)),
5215 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
5216 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
5217 Ok(Value::Text(self.read_str()?))
5218 }
5219 DataType::Vector {
5220 encoding: VecEncoding::F32,
5221 ..
5222 } => {
5223 let dim = self.read_u32()? as usize;
5224 let mut v = Vec::with_capacity(dim);
5225 for _ in 0..dim {
5226 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5227 v.push(f32::from_le_bytes(bytes));
5228 }
5229 Ok(Value::Vector(v))
5230 }
5231 DataType::Vector {
5232 encoding: VecEncoding::Sq8,
5233 ..
5234 } => {
5235 let dim = self.read_u32()? as usize;
5236 let min = self.read_f32()?;
5237 let max = self.read_f32()?;
5238 let bytes = self.take(dim)?.to_vec();
5239 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5240 }
5241 DataType::Vector {
5242 encoding: VecEncoding::F16,
5243 ..
5244 } => {
5245 let dim = self.read_u32()? as usize;
5246 let bytes = self.take(dim * 2)?.to_vec();
5247 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5248 }
5249 DataType::Numeric { .. } => {
5250 let s = self.take(16)?;
5251 let arr: [u8; 16] = s.try_into().expect("checked");
5252 let scaled = i128::from_le_bytes(arr);
5253 let scale = self.read_u8()?;
5254 Ok(Value::Numeric { scaled, scale })
5255 }
5256 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5257 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5258 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5259 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5260 DataType::Interval => {
5261 Err(StorageError::Corrupt(
5266 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5267 ))
5268 }
5269 DataType::Json => Ok(Value::Json(self.read_str()?)),
5270 DataType::Bytes => {
5273 let len = self.read_u16()? as usize;
5274 let bytes = self.take(len)?.to_vec();
5275 Ok(Value::Bytes(bytes))
5276 }
5277 }
5278 }
5279
5280 fn read_value(&mut self) -> Result<Value, StorageError> {
5281 let tag = self.read_u8()?;
5282 match tag {
5283 0 => Ok(Value::Null),
5284 1 => Ok(Value::Int(self.read_i32()?)),
5285 2 => Ok(Value::BigInt(self.read_i64()?)),
5286 3 => Ok(Value::Float(self.read_f64()?)),
5287 4 => Ok(Value::Text(self.read_str()?)),
5288 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5289 6 => {
5290 let dim = self.read_u32()? as usize;
5291 let mut v = Vec::with_capacity(dim);
5292 for _ in 0..dim {
5293 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5294 v.push(f32::from_le_bytes(bytes));
5295 }
5296 Ok(Value::Vector(v))
5297 }
5298 7 => {
5299 let s = self.take(2)?;
5300 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5301 }
5302 8 => {
5303 let s = self.take(16)?;
5304 let arr: [u8; 16] = s.try_into().expect("checked");
5305 let scaled = i128::from_le_bytes(arr);
5306 let scale = self.read_u8()?;
5307 Ok(Value::Numeric { scaled, scale })
5308 }
5309 9 => Ok(Value::Date(self.read_i32()?)),
5310 10 => Ok(Value::Timestamp(self.read_i64()?)),
5311 11 => {
5316 let dim = self.read_u32()? as usize;
5317 let min = self.read_f32()?;
5318 let max = self.read_f32()?;
5319 let bytes = self.take(dim)?.to_vec();
5320 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5321 }
5322 12 => {
5325 let dim = self.read_u32()? as usize;
5326 let bytes = self.take(dim * 2)?.to_vec();
5327 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5328 }
5329 14 => {
5331 let len = self.read_u16()? as usize;
5332 let bytes = self.take(len)?.to_vec();
5333 Ok(Value::Bytes(bytes))
5334 }
5335 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5336 }
5337 }
5338
5339 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5343 let m_max_0 = self.read_u16()? as usize;
5344 let entry_raw = self.read_u32()?;
5345 let entry = if entry_raw == u32::MAX {
5346 None
5347 } else {
5348 Some(entry_raw as usize)
5349 };
5350 let entry_level = self.read_u8()?;
5351 let node_count = self.read_u32()? as usize;
5352 let mut levels: PersistentVec<u8> = PersistentVec::new();
5357 for _ in 0..node_count {
5358 levels.push_mut(self.read_u8()?);
5359 }
5360 let layer_count = self.read_u8()? as usize;
5361 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5362 for _ in 0..layer_count {
5363 let n = self.read_u32()? as usize;
5364 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5365 for _ in 0..n {
5366 let cnt = self.read_u16()? as usize;
5367 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5368 for _ in 0..cnt {
5369 row.push(self.read_u32()?);
5370 }
5371 per_layer.push_mut(row);
5372 }
5373 layers.push(per_layer);
5374 }
5375 Ok(NswGraph {
5376 m,
5377 m_max_0,
5378 entry,
5379 entry_level,
5380 levels,
5381 layers,
5382 })
5383 }
5384}
5385
5386#[cfg(test)]
5387mod tests {
5388 use super::*;
5389 use alloc::string::ToString;
5390 use alloc::vec;
5391
5392 #[cfg(target_arch = "aarch64")]
5393 #[test]
5394 fn neon_l2_matches_scalar() {
5395 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5400 for &d in &dims {
5401 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5402 let mut a = Vec::with_capacity(d);
5403 let mut b = Vec::with_capacity(d);
5404 for _ in 0..d {
5405 state = state
5406 .wrapping_mul(6_364_136_223_846_793_005)
5407 .wrapping_add(1);
5408 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5409 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5410 state = state
5411 .wrapping_mul(6_364_136_223_846_793_005)
5412 .wrapping_add(1);
5413 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5414 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5415 a.push(x);
5416 b.push(y);
5417 }
5418 let scalar = l2_distance_sq_scalar(&a, &b);
5419 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5420 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5421 assert!(
5422 (scalar - neon).abs() <= tol,
5423 "dim={d}: scalar={scalar} neon={neon} diff={}",
5424 (scalar - neon).abs()
5425 );
5426 }
5427 }
5428
5429 #[cfg(target_arch = "aarch64")]
5430 #[test]
5431 fn neon_inner_product_matches_scalar() {
5432 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5436 for &d in &dims {
5437 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5438 let mut a = Vec::with_capacity(d);
5439 let mut b = Vec::with_capacity(d);
5440 for _ in 0..d {
5441 state = state
5442 .wrapping_mul(6_364_136_223_846_793_005)
5443 .wrapping_add(1);
5444 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5445 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5446 state = state
5447 .wrapping_mul(6_364_136_223_846_793_005)
5448 .wrapping_add(1);
5449 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5450 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5451 a.push(x);
5452 b.push(y);
5453 }
5454 let scalar = inner_product_scalar(&a, &b);
5455 let neon = unsafe { inner_product_neon(&a, &b) };
5456 #[allow(clippy::cast_precision_loss)]
5457 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5458 assert!(
5459 (scalar - neon).abs() <= tol,
5460 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5461 (scalar - neon).abs()
5462 );
5463 }
5464 }
5465
5466 #[cfg(target_arch = "aarch64")]
5467 #[allow(clippy::similar_names)]
5468 #[test]
5469 fn neon_cosine_dot_norms_matches_scalar() {
5470 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5471 for &d in &dims {
5472 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5473 let mut a = Vec::with_capacity(d);
5474 let mut b = Vec::with_capacity(d);
5475 for _ in 0..d {
5476 state = state
5477 .wrapping_mul(6_364_136_223_846_793_005)
5478 .wrapping_add(1);
5479 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5480 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5481 state = state
5482 .wrapping_mul(6_364_136_223_846_793_005)
5483 .wrapping_add(1);
5484 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5485 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5486 a.push(x);
5487 b.push(y);
5488 }
5489 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5490 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5491 #[allow(clippy::cast_precision_loss)]
5492 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5493 #[allow(clippy::cast_precision_loss)]
5494 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5495 assert!(
5496 (dot_s - dot_n).abs() <= tol_d,
5497 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5498 );
5499 assert!(
5500 (na_s - na_n).abs() <= tol_n,
5501 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5502 );
5503 assert!(
5504 (nb_s - nb_n).abs() <= tol_n,
5505 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5506 );
5507 }
5508 }
5509
5510 fn make_users_schema() -> TableSchema {
5511 TableSchema::new(
5512 "users",
5513 vec![
5514 ColumnSchema::new("id", DataType::Int, false),
5515 ColumnSchema::new("name", DataType::Text, false),
5516 ColumnSchema::new("score", DataType::Float, true),
5517 ],
5518 )
5519 }
5520
5521 #[test]
5522 fn value_type_tag_matches_variant() {
5523 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5524 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5525 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5526 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5527 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5528 assert_eq!(Value::Null.data_type(), None);
5529 assert!(Value::Null.is_null());
5530 assert!(!Value::Int(0).is_null());
5531 }
5532
5533 #[test]
5534 fn sq8_value_reports_sq8_data_type() {
5535 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5540 let v = Value::Sq8Vector(q);
5541 assert_eq!(
5542 v.data_type(),
5543 Some(DataType::Vector {
5544 dim: 5,
5545 encoding: VecEncoding::Sq8,
5546 }),
5547 );
5548 }
5549
5550 #[test]
5551 fn datatype_display_matches_pg_keyword() {
5552 assert_eq!(DataType::Int.to_string(), "INT");
5553 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5554 assert_eq!(DataType::Float.to_string(), "FLOAT");
5555 assert_eq!(DataType::Text.to_string(), "TEXT");
5556 assert_eq!(DataType::Bool.to_string(), "BOOL");
5557 }
5558
5559 #[test]
5560 fn row_len_and_emptiness() {
5561 let r = Row::new(vec![Value::Int(1), Value::Null]);
5562 assert_eq!(r.len(), 2);
5563 assert!(!r.is_empty());
5564 assert!(Row::new(Vec::new()).is_empty());
5565 }
5566
5567 #[test]
5568 fn table_schema_column_position() {
5569 let s = make_users_schema();
5570 assert_eq!(s.column_position("id"), Some(0));
5571 assert_eq!(s.column_position("score"), Some(2));
5572 assert_eq!(s.column_position("missing"), None);
5573 }
5574
5575 #[test]
5576 fn catalog_create_table_then_lookup() {
5577 let mut cat = Catalog::new();
5578 cat.create_table(make_users_schema()).unwrap();
5579 assert_eq!(cat.table_count(), 1);
5580 assert!(cat.get("users").is_some());
5581 assert!(cat.get("nope").is_none());
5582 }
5583
5584 #[test]
5585 fn catalog_duplicate_table_is_rejected() {
5586 let mut cat = Catalog::new();
5587 cat.create_table(make_users_schema()).unwrap();
5588 let err = cat.create_table(make_users_schema()).unwrap_err();
5589 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5590 }
5591
5592 #[test]
5593 fn table_insert_happy_path_appends_row() {
5594 let mut cat = Catalog::new();
5595 cat.create_table(make_users_schema()).unwrap();
5596 let t = cat.get_mut("users").unwrap();
5597 t.insert(Row::new(vec![
5598 Value::Int(1),
5599 Value::Text("alice".into()),
5600 Value::Float(99.5),
5601 ]))
5602 .unwrap();
5603 assert_eq!(t.row_count(), 1);
5604 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5605 }
5606
5607 #[test]
5608 fn table_insert_arity_mismatch() {
5609 let mut cat = Catalog::new();
5610 cat.create_table(make_users_schema()).unwrap();
5611 let t = cat.get_mut("users").unwrap();
5612 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5613 assert!(matches!(
5614 err,
5615 StorageError::ArityMismatch {
5616 expected: 3,
5617 actual: 1
5618 }
5619 ));
5620 assert_eq!(t.row_count(), 0);
5621 }
5622
5623 #[test]
5624 fn table_insert_type_mismatch_reports_column() {
5625 let mut cat = Catalog::new();
5626 cat.create_table(make_users_schema()).unwrap();
5627 let t = cat.get_mut("users").unwrap();
5628 let err = t
5629 .insert(Row::new(vec![
5630 Value::Int(1),
5631 Value::Int(42), Value::Float(0.0),
5633 ]))
5634 .unwrap_err();
5635 match err {
5636 StorageError::TypeMismatch {
5637 ref column,
5638 expected,
5639 actual,
5640 position,
5641 } => {
5642 assert_eq!(column, "name");
5643 assert_eq!(expected, DataType::Text);
5644 assert_eq!(actual, DataType::Int);
5645 assert_eq!(position, 1);
5646 }
5647 other => panic!("unexpected: {other:?}"),
5648 }
5649 assert_eq!(t.row_count(), 0);
5650 }
5651
5652 #[test]
5653 fn table_insert_null_into_not_null_rejected() {
5654 let mut cat = Catalog::new();
5655 cat.create_table(make_users_schema()).unwrap();
5656 let t = cat.get_mut("users").unwrap();
5657 let err = t
5658 .insert(Row::new(vec![
5659 Value::Int(1),
5660 Value::Null, Value::Float(1.0),
5662 ]))
5663 .unwrap_err();
5664 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5665 }
5666
5667 #[test]
5668 fn table_insert_null_into_nullable_ok() {
5669 let mut cat = Catalog::new();
5670 cat.create_table(make_users_schema()).unwrap();
5671 let t = cat.get_mut("users").unwrap();
5672 t.insert(Row::new(vec![
5673 Value::Int(1),
5674 Value::Text("bob".into()),
5675 Value::Null,
5676 ]))
5677 .unwrap();
5678 assert_eq!(t.row_count(), 1);
5679 }
5680
5681 #[test]
5682 fn catalog_get_mut_independent_per_table() {
5683 let mut cat = Catalog::new();
5684 cat.create_table(TableSchema::new(
5685 "a",
5686 vec![ColumnSchema::new("v", DataType::Int, false)],
5687 ))
5688 .unwrap();
5689 cat.create_table(TableSchema::new(
5690 "b",
5691 vec![ColumnSchema::new("v", DataType::Int, false)],
5692 ))
5693 .unwrap();
5694 cat.get_mut("a")
5695 .unwrap()
5696 .insert(Row::new(vec![Value::Int(1)]))
5697 .unwrap();
5698 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5699 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5700 }
5701
5702 fn assert_round_trip(cat: &Catalog) {
5705 let bytes = cat.serialize();
5706 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5707 assert_eq!(restored.table_count(), cat.table_count());
5710 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5711 assert_eq!(a.schema, b.schema);
5712 assert_eq!(a.rows, b.rows);
5713 }
5714 }
5715
5716 #[test]
5717 fn serialize_empty_catalog_round_trips() {
5718 assert_round_trip(&Catalog::new());
5719 }
5720
5721 #[test]
5722 fn serialize_single_empty_table_round_trips() {
5723 let mut cat = Catalog::new();
5724 cat.create_table(make_users_schema()).unwrap();
5725 assert_round_trip(&cat);
5726 }
5727
5728 #[test]
5729 fn nsw_clone_is_o1() {
5730 let mut cat = Catalog::new();
5739 cat.create_table(TableSchema::new(
5740 "docs",
5741 alloc::vec![
5742 ColumnSchema::new("id", DataType::Int, false),
5743 ColumnSchema::new(
5744 "v",
5745 DataType::Vector {
5746 dim: 3,
5747 encoding: VecEncoding::F32
5748 },
5749 true
5750 ),
5751 ],
5752 ))
5753 .unwrap();
5754 let t = cat.get_mut("docs").unwrap();
5755 for i in 0..1500_i32 {
5756 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5758 t.insert(Row::new(alloc::vec![
5759 Value::Int(i),
5760 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5761 ]))
5762 .unwrap();
5763 }
5764 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5765 .unwrap();
5766 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5767 IndexKind::Nsw(g) => g,
5768 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5769 };
5770 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5773 assert!(
5774 g.layers.len() >= 2,
5775 "1500 nodes should populate at least two HNSW layers, got {}",
5776 g.layers.len()
5777 );
5778
5779 let cloned = g.clone();
5780
5781 assert!(
5782 g.levels.shares_storage_with(&cloned.levels),
5783 "levels PV not shared after clone — clone copied elements (O(N))"
5784 );
5785 assert_eq!(g.layers.len(), cloned.layers.len());
5786 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5787 assert!(
5788 orig.shares_storage_with(cl),
5789 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5790 );
5791 }
5792 }
5793
5794 #[test]
5795 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5796 let mut cat = Catalog::new();
5803 cat.create_table(TableSchema::new(
5804 "vecs",
5805 alloc::vec![
5806 ColumnSchema::new("id", DataType::Int, false),
5807 ColumnSchema::new(
5808 "v",
5809 DataType::Vector {
5810 dim: 8,
5811 encoding: VecEncoding::Sq8,
5812 },
5813 false,
5814 ),
5815 ],
5816 ))
5817 .unwrap();
5818 let t = cat.get_mut("vecs").unwrap();
5819 for i in 0..32_i32 {
5820 #[allow(clippy::cast_precision_loss)]
5821 let base = (i as f32) * 0.03;
5822 let v: Vec<f32> = (0..8_i32)
5823 .map(|j| {
5824 #[allow(clippy::cast_precision_loss)]
5825 let off = (j as f32) * 0.01;
5826 base + off
5827 })
5828 .collect();
5829 t.insert(Row::new(alloc::vec![
5830 Value::Int(i),
5831 Value::Sq8Vector(quantize::quantize(&v)),
5832 ]))
5833 .unwrap();
5834 }
5835 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5836 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5839 let (before_cell, before_ty, before_hits) = {
5840 let t_ref = cat.get("vecs").unwrap();
5841 (
5842 t_ref.rows()[5].values[1].clone(),
5843 t_ref.schema().columns[1].ty,
5844 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5845 )
5846 };
5847
5848 let bytes = cat.serialize();
5849 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5850 let rt = restored.get("vecs").unwrap();
5851 assert_eq!(rt.schema().columns[1].ty, before_ty);
5852 assert_eq!(rt.rows()[5].values[1], before_cell);
5853 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5854 assert_eq!(before_hits, after_hits);
5855 }
5856
5857 #[test]
5858 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5859 use crate::halfvec;
5866 let mut cat = Catalog::new();
5867 cat.create_table(TableSchema::new(
5868 "vecs",
5869 alloc::vec![
5870 ColumnSchema::new("id", DataType::Int, false),
5871 ColumnSchema::new(
5872 "v",
5873 DataType::Vector {
5874 dim: 8,
5875 encoding: VecEncoding::F16,
5876 },
5877 false,
5878 ),
5879 ],
5880 ))
5881 .unwrap();
5882 let t = cat.get_mut("vecs").unwrap();
5883 for i in 0..32_i32 {
5884 #[allow(clippy::cast_precision_loss)]
5885 let base = (i as f32) * 0.03;
5886 let v: Vec<f32> = (0..8_i32)
5887 .map(|j| {
5888 #[allow(clippy::cast_precision_loss)]
5889 let off = (j as f32) * 0.01;
5890 base + off
5891 })
5892 .collect();
5893 t.insert(Row::new(alloc::vec![
5894 Value::Int(i),
5895 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
5896 ]))
5897 .unwrap();
5898 }
5899 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5900 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5901 let (before_cell, before_ty, before_hits) = {
5902 let t_ref = cat.get("vecs").unwrap();
5903 (
5904 t_ref.rows()[5].values[1].clone(),
5905 t_ref.schema().columns[1].ty,
5906 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5907 )
5908 };
5909 let bytes = cat.serialize();
5910 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5911 let rt = restored.get("vecs").unwrap();
5912 assert_eq!(rt.schema().columns[1].ty, before_ty);
5913 assert_eq!(rt.rows()[5].values[1], before_cell);
5914 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5915 assert_eq!(before_hits, after_hits);
5916 }
5917
5918 #[test]
5919 #[allow(clippy::similar_names)]
5920 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
5921 use crate::halfvec;
5928 fn next(state: &mut u64) -> f32 {
5929 *state = state
5930 .wrapping_add(0x9E37_79B9_7F4A_7C15)
5931 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
5932 #[allow(clippy::cast_precision_loss)]
5933 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
5934 2.0 * u - 1.0
5935 }
5936 let dim: u32 = 32;
5937 let n: usize = 512;
5938 let dim_us = dim as usize;
5939 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
5940 let corpus: Vec<Vec<f32>> = (0..n)
5941 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5942 .collect();
5943 let queries: Vec<Vec<f32>> = (0..32)
5944 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
5945 .collect();
5946 let exact_top10: Vec<Vec<usize>> = queries
5947 .iter()
5948 .map(|q| {
5949 let mut scored: Vec<(f32, usize)> = corpus
5950 .iter()
5951 .enumerate()
5952 .map(|(i, v)| (l2_distance_sq(v, q), i))
5953 .collect();
5954 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
5955 scored.into_iter().take(10).map(|(_, i)| i).collect()
5956 })
5957 .collect();
5958 let mut cat = Catalog::new();
5959 cat.create_table(TableSchema::new(
5960 "vecs",
5961 alloc::vec![
5962 ColumnSchema::new("id", DataType::Int, false),
5963 ColumnSchema::new(
5964 "v",
5965 DataType::Vector {
5966 dim,
5967 encoding: VecEncoding::F16,
5968 },
5969 false,
5970 ),
5971 ],
5972 ))
5973 .unwrap();
5974 let t = cat.get_mut("vecs").unwrap();
5975 for (i, v) in corpus.iter().enumerate() {
5976 t.insert(Row::new(alloc::vec![
5977 Value::Int(i32::try_from(i).unwrap()),
5978 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
5979 ]))
5980 .unwrap();
5981 }
5982 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5983 let table = cat.get("vecs").unwrap();
5984 let mut total_overlap = 0_usize;
5985 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
5986 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
5987 for h in &hits {
5988 if exact.contains(h) {
5989 total_overlap += 1;
5990 }
5991 }
5992 }
5993 #[allow(clippy::cast_precision_loss)]
5994 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
5995 assert!(
5996 recall >= 0.95,
5997 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
5998 check halfvec dispatch in `cell_to_query_metric_distance`"
5999 );
6000 }
6001
6002 #[test]
6003 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
6004 use crate::quantize;
6011 fn next(state: &mut u64) -> f32 {
6015 *state = state
6016 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6017 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6018 #[allow(clippy::cast_precision_loss)]
6019 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6020 2.0 * u - 1.0
6021 }
6022 let dim: u32 = 32;
6023 let n: usize = 512;
6024 let dim_us = dim as usize;
6025 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
6026 let corpus: Vec<Vec<f32>> = (0..n)
6027 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6028 .collect();
6029 let queries: Vec<Vec<f32>> = (0..32)
6030 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6031 .collect();
6032 let exact_top10: Vec<Vec<usize>> = queries
6034 .iter()
6035 .map(|q| {
6036 let mut scored: Vec<(f32, usize)> = corpus
6037 .iter()
6038 .enumerate()
6039 .map(|(i, v)| (l2_distance_sq(v, q), i))
6040 .collect();
6041 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6042 scored.into_iter().take(10).map(|(_, i)| i).collect()
6043 })
6044 .collect();
6045 let mut cat = Catalog::new();
6048 cat.create_table(TableSchema::new(
6049 "vecs",
6050 alloc::vec![
6051 ColumnSchema::new("id", DataType::Int, false),
6052 ColumnSchema::new(
6053 "v",
6054 DataType::Vector {
6055 dim,
6056 encoding: VecEncoding::Sq8,
6057 },
6058 false,
6059 ),
6060 ],
6061 ))
6062 .unwrap();
6063 let t = cat.get_mut("vecs").unwrap();
6064 for (i, v) in corpus.iter().enumerate() {
6065 t.insert(Row::new(alloc::vec![
6066 Value::Int(i32::try_from(i).unwrap()),
6067 Value::Sq8Vector(quantize::quantize(v)),
6068 ]))
6069 .unwrap();
6070 }
6071 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6072 let table = cat.get("vecs").unwrap();
6073 let mut total_overlap = 0_usize;
6074 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6075 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6076 for h in &hits {
6077 if exact.contains(h) {
6078 total_overlap += 1;
6079 }
6080 }
6081 }
6082 #[allow(clippy::cast_precision_loss)]
6083 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6084 assert!(
6085 recall >= 0.95,
6086 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6087 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
6088 );
6089 }
6090
6091 #[test]
6092 fn nsw_index_topology_persists_through_round_trip() {
6093 let mut cat = Catalog::new();
6099 cat.create_table(TableSchema::new(
6100 "docs",
6101 alloc::vec![
6102 ColumnSchema::new("id", DataType::Int, false),
6103 ColumnSchema::new(
6104 "v",
6105 DataType::Vector {
6106 dim: 3,
6107 encoding: VecEncoding::F32
6108 },
6109 true
6110 ),
6111 ],
6112 ))
6113 .unwrap();
6114 let t = cat.get_mut("docs").unwrap();
6115 for i in 0..6_i32 {
6116 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
6118 let row = Row::new(alloc::vec![
6119 Value::Int(i),
6120 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6121 ]);
6122 t.insert(row).unwrap();
6123 }
6124 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6125 .unwrap();
6126 let original = match &cat.get("docs").unwrap().indices()[0].kind {
6127 IndexKind::Nsw(g) => g.clone(),
6128 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6129 };
6130 let bytes = cat.serialize();
6131 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6132 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
6133 IndexKind::Nsw(g) => g.clone(),
6134 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6135 };
6136 assert_eq!(restored_graph.m, original.m);
6137 assert_eq!(restored_graph.m_max_0, original.m_max_0);
6138 assert_eq!(restored_graph.entry, original.entry);
6139 assert_eq!(restored_graph.entry_level, original.entry_level);
6140 assert_eq!(restored_graph.levels, original.levels);
6141 assert_eq!(restored_graph.layers, original.layers);
6142 }
6143
6144 #[test]
6145 fn hnsw_level_assignment_is_deterministic() {
6146 for i in 0..32usize {
6149 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
6150 }
6151 }
6152
6153 #[test]
6154 fn hnsw_layer_0_dominates_population() {
6155 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
6160 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
6161 }
6162
6163 #[test]
6164 fn hnsw_search_matches_brute_force_for_l2_top1() {
6165 let mut cat = Catalog::new();
6169 cat.create_table(TableSchema::new(
6170 "vecs",
6171 alloc::vec![
6172 ColumnSchema::new("id", DataType::Int, false),
6173 ColumnSchema::new(
6174 "v",
6175 DataType::Vector {
6176 dim: 3,
6177 encoding: VecEncoding::F32
6178 },
6179 true
6180 ),
6181 ],
6182 ))
6183 .unwrap();
6184 let t = cat.get_mut("vecs").unwrap();
6185 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
6186 (1, [0.0, 0.0, 0.0]),
6187 (2, [1.0, 0.0, 0.0]),
6188 (3, [0.0, 1.0, 0.0]),
6189 (4, [0.0, 0.0, 1.0]),
6190 (5, [1.0, 1.0, 0.0]),
6191 (6, [1.0, 0.0, 1.0]),
6192 (7, [0.0, 1.0, 1.0]),
6193 (8, [1.0, 1.0, 1.0]),
6194 (9, [0.5, 0.5, 0.5]),
6195 (10, [0.2, 0.8, 0.5]),
6196 ];
6197 for &(id, v) in &dataset {
6198 t.insert(Row::new(alloc::vec![
6199 Value::Int(id),
6200 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
6201 ]))
6202 .unwrap();
6203 }
6204 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6205 let idx_pos = cat
6206 .get("vecs")
6207 .unwrap()
6208 .indices()
6209 .iter()
6210 .position(|i| i.name == "v_idx")
6211 .unwrap();
6212 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
6213 let table = cat.get("vecs").unwrap();
6214 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
6215 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
6216 .map(|i| {
6217 let Value::Vector(v) = &table.rows[i].values[1] else {
6218 return (f32::INFINITY, i);
6219 };
6220 (l2_distance_sq(v, &query), i)
6221 })
6222 .collect();
6223 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6224 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
6225 assert_eq!(
6226 hnsw_top[0].1, brute[0].1,
6227 "HNSW top-1 != brute-force top-1 for {query:?}"
6228 );
6229 }
6230 }
6231
6232 #[test]
6233 fn serialize_table_with_rows_round_trips() {
6234 let mut cat = Catalog::new();
6235 cat.create_table(make_users_schema()).unwrap();
6236 let t = cat.get_mut("users").unwrap();
6237 t.insert(Row::new(vec![
6238 Value::Int(1),
6239 Value::Text("alice".into()),
6240 Value::Float(95.5),
6241 ]))
6242 .unwrap();
6243 t.insert(Row::new(vec![
6244 Value::Int(2),
6245 Value::Text("bob".into()),
6246 Value::Null,
6247 ]))
6248 .unwrap();
6249 assert_round_trip(&cat);
6250 }
6251
6252 #[test]
6253 fn serialize_multiple_tables_round_trips() {
6254 let mut cat = Catalog::new();
6255 cat.create_table(make_users_schema()).unwrap();
6256 cat.create_table(TableSchema::new(
6257 "flags",
6258 vec![
6259 ColumnSchema::new("id", DataType::BigInt, false),
6260 ColumnSchema::new("active", DataType::Bool, false),
6261 ],
6262 ))
6263 .unwrap();
6264 cat.get_mut("flags")
6265 .unwrap()
6266 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6267 .unwrap();
6268 assert_round_trip(&cat);
6269 }
6270
6271 #[test]
6272 fn deserialize_rejects_bad_magic() {
6273 let mut buf = b"BADMAGIC".to_vec();
6274 buf.push(FILE_VERSION);
6275 buf.extend_from_slice(&0u32.to_le_bytes());
6276 let err = Catalog::deserialize(&buf).unwrap_err();
6277 assert!(matches!(err, StorageError::Corrupt(_)));
6278 }
6279
6280 #[test]
6281 fn deserialize_rejects_unsupported_version() {
6282 let mut buf = FILE_MAGIC.to_vec();
6283 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6285 let err = Catalog::deserialize(&buf).unwrap_err();
6286 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6287 }
6288
6289 #[test]
6290 fn deserialize_rejects_truncated_file() {
6291 let mut cat = Catalog::new();
6292 cat.create_table(make_users_schema()).unwrap();
6293 let bytes = cat.serialize();
6294 let truncated = &bytes[..bytes.len() - 1];
6296 assert!(matches!(
6297 Catalog::deserialize(truncated),
6298 Err(StorageError::Corrupt(_))
6299 ));
6300 }
6301
6302 #[test]
6303 fn deserialize_rejects_trailing_garbage() {
6304 let cat = Catalog::new();
6305 let mut bytes = cat.serialize();
6306 bytes.push(0xFF);
6307 assert!(matches!(
6308 Catalog::deserialize(&bytes),
6309 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6310 ));
6311 }
6312
6313 fn populated_users() -> Catalog {
6316 let mut cat = Catalog::new();
6317 cat.create_table(make_users_schema()).unwrap();
6318 let t = cat.get_mut("users").unwrap();
6319 for (id, name, score) in [
6320 (1, "alice", Some(90.0)),
6321 (2, "bob", None),
6322 (3, "alice", Some(70.0)), ] {
6324 t.insert(Row::new(vec![
6325 Value::Int(id),
6326 Value::Text(name.into()),
6327 score.map_or(Value::Null, Value::Float),
6328 ]))
6329 .unwrap();
6330 }
6331 cat
6332 }
6333
6334 #[test]
6335 fn add_index_builds_from_existing_rows() {
6336 let mut cat = populated_users();
6337 cat.get_mut("users")
6338 .unwrap()
6339 .add_index("by_id".into(), "id")
6340 .unwrap();
6341 let t = cat.get("users").unwrap();
6342 let idx = t.index_on(0).expect("index_on(0)");
6343 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6344 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6345 }
6346
6347 #[test]
6348 fn add_index_dup_name_rejected() {
6349 let mut cat = populated_users();
6350 let t = cat.get_mut("users").unwrap();
6351 t.add_index("ix".into(), "id").unwrap();
6352 let err = t.add_index("ix".into(), "name").unwrap_err();
6353 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6354 }
6355
6356 #[test]
6357 fn add_index_unknown_column_rejected() {
6358 let mut cat = populated_users();
6359 let err = cat
6360 .get_mut("users")
6361 .unwrap()
6362 .add_index("ix".into(), "ghost")
6363 .unwrap_err();
6364 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6365 }
6366
6367 #[test]
6368 fn insert_after_create_index_updates_it() {
6369 let mut cat = populated_users();
6370 let t = cat.get_mut("users").unwrap();
6371 t.add_index("by_name".into(), "name").unwrap();
6372 t.insert(Row::new(vec![
6373 Value::Int(4),
6374 Value::Text("dave".into()),
6375 Value::Null,
6376 ]))
6377 .unwrap();
6378 let idx = t.index_on(1).unwrap();
6379 assert_eq!(
6380 idx.lookup_eq(&IndexKey::Text("dave".into())),
6381 &[RowLocator::Hot(3)]
6382 );
6383 assert_eq!(
6385 idx.lookup_eq(&IndexKey::Text("alice".into())),
6386 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6387 );
6388 }
6389
6390 #[test]
6391 fn null_or_float_values_are_not_indexed() {
6392 let mut cat = populated_users();
6393 let t = cat.get_mut("users").unwrap();
6394 t.add_index("by_score".into(), "score").unwrap();
6395 let idx = t.index_on(2).unwrap();
6396 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6401 }
6402
6403 #[test]
6406 fn vector_value_data_type_carries_dim() {
6407 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6408 assert_eq!(
6409 v.data_type(),
6410 Some(DataType::Vector {
6411 dim: 3,
6412 encoding: VecEncoding::F32
6413 })
6414 );
6415 }
6416
6417 #[test]
6418 fn vector_column_insert_matching_dim_ok() {
6419 let mut cat = Catalog::new();
6420 cat.create_table(TableSchema::new(
6421 "emb",
6422 vec![ColumnSchema::new(
6423 "v",
6424 DataType::Vector {
6425 dim: 3,
6426 encoding: VecEncoding::F32,
6427 },
6428 false,
6429 )],
6430 ))
6431 .unwrap();
6432 cat.get_mut("emb")
6433 .unwrap()
6434 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6435 .unwrap();
6436 }
6437
6438 #[test]
6439 fn vector_column_insert_dim_mismatch_rejected() {
6440 let mut cat = Catalog::new();
6441 cat.create_table(TableSchema::new(
6442 "emb",
6443 vec![ColumnSchema::new(
6444 "v",
6445 DataType::Vector {
6446 dim: 3,
6447 encoding: VecEncoding::F32,
6448 },
6449 false,
6450 )],
6451 ))
6452 .unwrap();
6453 let err = cat
6454 .get_mut("emb")
6455 .unwrap()
6456 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6457 .unwrap_err();
6458 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6459 }
6460
6461 #[test]
6462 fn vector_value_survives_catalog_round_trip() {
6463 let mut cat = Catalog::new();
6464 cat.create_table(TableSchema::new(
6465 "emb",
6466 vec![
6467 ColumnSchema::new("id", DataType::Int, false),
6468 ColumnSchema::new(
6469 "v",
6470 DataType::Vector {
6471 dim: 4,
6472 encoding: VecEncoding::F32,
6473 },
6474 false,
6475 ),
6476 ],
6477 ))
6478 .unwrap();
6479 cat.get_mut("emb")
6480 .unwrap()
6481 .insert(Row::new(vec![
6482 Value::Int(1),
6483 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6484 ]))
6485 .unwrap();
6486 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6487 let table = restored.get("emb").unwrap();
6488 assert_eq!(
6489 table.schema().columns[1].ty,
6490 DataType::Vector {
6491 dim: 4,
6492 encoding: VecEncoding::F32
6493 }
6494 );
6495 assert_eq!(
6496 table.rows()[0].values[1],
6497 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6498 );
6499 }
6500
6501 #[test]
6502 fn index_survives_serialize_deserialize_round_trip() {
6503 let mut cat = populated_users();
6504 cat.get_mut("users")
6505 .unwrap()
6506 .add_index("by_name".into(), "name")
6507 .unwrap();
6508 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6509 let idx = restored
6510 .get("users")
6511 .unwrap()
6512 .index_on(1)
6513 .expect("index_on(1) after restore");
6514 assert_eq!(idx.name, "by_name");
6515 assert_eq!(
6517 idx.lookup_eq(&IndexKey::Text("alice".into())),
6518 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6519 );
6520 }
6521
6522 fn bigint_pk_users_schema() -> TableSchema {
6527 TableSchema::new(
6528 "users",
6529 vec![
6530 ColumnSchema::new("id", DataType::BigInt, false),
6531 ColumnSchema::new("name", DataType::Text, false),
6532 ],
6533 )
6534 }
6535
6536 fn make_user_row(id: i64, name: &str) -> Row {
6537 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6538 }
6539
6540 #[test]
6541 fn lookup_by_pk_finds_row_via_hot_index() {
6542 let mut cat = Catalog::new();
6543 cat.create_table(bigint_pk_users_schema()).unwrap();
6544 let t = cat.get_mut("users").unwrap();
6545 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6546 t.insert(make_user_row(id, name)).unwrap();
6547 }
6548 t.add_index("by_id".into(), "id").unwrap();
6549 let got = cat
6551 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6552 .unwrap();
6553 assert_eq!(got, make_user_row(2, "bob"));
6554 assert_eq!(cat.cold_segment_count(), 0);
6555 }
6556
6557 #[test]
6558 fn lookup_by_pk_returns_none_when_key_missing() {
6559 let mut cat = Catalog::new();
6560 cat.create_table(bigint_pk_users_schema()).unwrap();
6561 let t = cat.get_mut("users").unwrap();
6562 t.insert(make_user_row(1, "alice")).unwrap();
6563 t.add_index("by_id".into(), "id").unwrap();
6564 assert!(
6565 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6566 .is_none()
6567 );
6568 assert!(
6570 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6571 .is_none()
6572 );
6573 assert!(
6574 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6575 .is_none()
6576 );
6577 }
6578
6579 #[test]
6580 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6581 let mut cat = Catalog::new();
6585 cat.create_table(bigint_pk_users_schema()).unwrap();
6586 let t = cat.get_mut("users").unwrap();
6587 t.add_index("by_id".into(), "id").unwrap();
6588 let schema = t.schema.clone();
6589
6590 let cold_rows: Vec<(i64, &str)> =
6591 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6592 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6593 .iter()
6594 .map(|(id, name)| {
6595 let row = make_user_row(*id, name);
6596 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6597 })
6598 .collect();
6599 let (seg_bytes, _meta) =
6600 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6601 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6602 assert_eq!(seg_id, 0);
6603 assert_eq!(cat.cold_segment_count(), 1);
6604
6605 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6606 .iter()
6607 .map(|(id, _)| {
6608 (
6609 IndexKey::Int(*id),
6610 RowLocator::Cold {
6611 segment_id: seg_id,
6612 page_offset: 0,
6613 },
6614 )
6615 })
6616 .collect();
6617 let registered = cat
6618 .get_mut("users")
6619 .unwrap()
6620 .register_cold_locators("by_id", pairs)
6621 .unwrap();
6622 assert_eq!(registered, 4);
6623
6624 for (id, name) in &cold_rows {
6625 let got = cat
6626 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6627 .unwrap_or_else(|| panic!("cold key {id} not found"));
6628 assert_eq!(got, make_user_row(*id, name));
6629 }
6630 assert!(
6632 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6633 .is_none()
6634 );
6635 }
6636
6637 #[test]
6638 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6639 let mut cat = Catalog::new();
6643 cat.create_table(bigint_pk_users_schema()).unwrap();
6644 let t = cat.get_mut("users").unwrap();
6645 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6646 t.insert(make_user_row(id, name)).unwrap();
6647 }
6648 t.add_index("by_id".into(), "id").unwrap();
6649 let schema = t.schema.clone();
6650
6651 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6652 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6653 .iter()
6654 .map(|(id, name)| {
6655 let row = make_user_row(*id, name);
6656 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6657 })
6658 .collect();
6659 let (seg_bytes, _) =
6660 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6661 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6662 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6663 .iter()
6664 .map(|(id, _)| {
6665 (
6666 IndexKey::Int(*id),
6667 RowLocator::Cold {
6668 segment_id: seg_id,
6669 page_offset: 0,
6670 },
6671 )
6672 })
6673 .collect();
6674 cat.get_mut("users")
6675 .unwrap()
6676 .register_cold_locators("by_id", pairs)
6677 .unwrap();
6678
6679 assert_eq!(
6681 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6682 .unwrap(),
6683 make_user_row(1, "alice")
6684 );
6685 assert_eq!(
6686 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6687 .unwrap(),
6688 make_user_row(2, "bob")
6689 );
6690 assert_eq!(
6692 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6693 .unwrap(),
6694 make_user_row(100, "ivy")
6695 );
6696 assert_eq!(
6697 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6698 .unwrap(),
6699 make_user_row(200, "joe")
6700 );
6701 assert!(
6703 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6704 .is_none()
6705 );
6706 }
6707
6708 #[test]
6709 fn register_cold_locators_rejects_nsw_index() {
6710 let mut cat = Catalog::new();
6711 cat.create_table(TableSchema::new(
6712 "vecs",
6713 vec![
6714 ColumnSchema::new("id", DataType::Int, false),
6715 ColumnSchema::new(
6716 "v",
6717 DataType::Vector {
6718 dim: 4,
6719 encoding: VecEncoding::F32,
6720 },
6721 false,
6722 ),
6723 ],
6724 ))
6725 .unwrap();
6726 let t = cat.get_mut("vecs").unwrap();
6727 t.insert(Row::new(vec![
6728 Value::Int(1),
6729 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6730 ]))
6731 .unwrap();
6732 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6733 let err = t
6734 .register_cold_locators(
6735 "by_v",
6736 vec![(
6737 IndexKey::Int(1),
6738 RowLocator::Cold {
6739 segment_id: 0,
6740 page_offset: 0,
6741 },
6742 )],
6743 )
6744 .unwrap_err();
6745 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6748 }
6749
6750 #[test]
6751 fn load_segment_bytes_rejects_garbage() {
6752 let mut cat = Catalog::new();
6753 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6754 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6755 assert_eq!(cat.cold_segment_count(), 0);
6757 }
6758
6759 #[test]
6760 fn load_segment_bytes_returns_sequential_ids() {
6761 let mut cat = Catalog::new();
6762 cat.create_table(bigint_pk_users_schema()).unwrap();
6763 let schema = cat.get("users").unwrap().schema.clone();
6764 for batch in 0u32..3 {
6765 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6766 .map(|i| {
6767 let id = u64::from(batch) * 100 + i;
6768 let row = make_user_row(id.cast_signed(), "x");
6769 (id, encode_row_body_dense(&row, &schema))
6770 })
6771 .collect();
6772 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6773 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6774 }
6775 assert_eq!(cat.cold_segment_count(), 3);
6776 }
6777
6778 #[test]
6785 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6786 let mut cat = populated_users();
6793 cat.get_mut("users")
6794 .unwrap()
6795 .add_index("by_name".into(), "name")
6796 .unwrap();
6797
6798 let v8_bytes = encode_as_v8(&cat);
6803 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6804
6805 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6806 let idx = restored
6807 .get("users")
6808 .unwrap()
6809 .index_on(1)
6810 .expect("index_on(1) after restore");
6811 assert_eq!(
6814 idx.lookup_eq(&IndexKey::Text("alice".into())),
6815 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6816 );
6817 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6819 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6820 }
6821 }
6822
6823 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6828 let mut out = Vec::with_capacity(64);
6829 out.extend_from_slice(FILE_MAGIC);
6830 out.push(8u8);
6831 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6832 for t in &cat.tables {
6833 write_str(&mut out, &t.schema.name);
6834 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6835 for c in &t.schema.columns {
6836 write_str(&mut out, &c.name);
6837 write_data_type(&mut out, c.ty);
6838 out.push(u8::from(c.nullable));
6839 match &c.default {
6840 None => out.push(0),
6841 Some(v) => {
6842 out.push(1);
6843 write_value(&mut out, v);
6844 }
6845 }
6846 out.push(u8::from(c.auto_increment));
6847 }
6848 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6849 for row in &t.rows {
6850 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6851 }
6852 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6853 for idx in &t.indices {
6854 write_str(&mut out, &idx.name);
6855 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6856 match &idx.kind {
6857 IndexKind::BTree(_) => out.push(0),
6860 IndexKind::Nsw(g) => {
6861 out.push(1);
6862 write_u16(&mut out, u16::try_from(g.m).unwrap());
6863 write_nsw_graph(&mut out, g);
6864 }
6865 IndexKind::Brin { .. } => panic!(
6868 "v8 catalog writer cannot serialise BRIN — \
6869 tests with BRIN indices must use the current writer"
6870 ),
6871 }
6872 }
6873 }
6874 out
6875 }
6876
6877 #[test]
6883 fn v9_catalog_round_trip_preserves_cold_locators() {
6884 let mut cat = Catalog::new();
6885 cat.create_table(bigint_pk_users_schema()).unwrap();
6886 let t = cat.get_mut("users").unwrap();
6887 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6889 t.insert(make_user_row(id, name)).unwrap();
6890 }
6891 t.add_index("by_id".into(), "id").unwrap();
6892 let schema = t.schema.clone();
6893
6894 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
6896 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6897 .iter()
6898 .map(|(id, name)| {
6899 let row = make_user_row(*id, name);
6900 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6901 })
6902 .collect();
6903 let (seg_bytes, _) =
6904 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6905 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
6906 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6907 .iter()
6908 .map(|(id, _)| {
6909 (
6910 IndexKey::Int(*id),
6911 RowLocator::Cold {
6912 segment_id: seg_id,
6913 page_offset: 0,
6914 },
6915 )
6916 })
6917 .collect();
6918 cat.get_mut("users")
6919 .unwrap()
6920 .register_cold_locators("by_id", pairs)
6921 .unwrap();
6922
6923 let bytes = cat.serialize();
6925 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
6926 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
6927
6928 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
6935 assert_eq!(restored_seg_id, seg_id);
6936
6937 let idx = restored.get("users").unwrap().index_on(0).unwrap();
6938 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
6940 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6941 for (id, _) in &cold_rows {
6943 assert_eq!(
6944 idx.lookup_eq(&IndexKey::Int(*id)),
6945 &[RowLocator::Cold {
6946 segment_id: seg_id,
6947 page_offset: 0,
6948 }]
6949 );
6950 }
6951 assert_eq!(
6953 restored
6954 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6955 .unwrap(),
6956 make_user_row(2, "bob")
6957 );
6958 for (id, name) in &cold_rows {
6959 assert_eq!(
6960 restored
6961 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6962 .unwrap(),
6963 make_user_row(*id, name)
6964 );
6965 }
6966 }
6967
6968 #[test]
6975 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
6976 let schema = TableSchema::new(
6977 "wide",
6978 vec![
6979 ColumnSchema::new("a", DataType::SmallInt, true),
6980 ColumnSchema::new("b", DataType::Int, false),
6981 ColumnSchema::new("c", DataType::BigInt, false),
6982 ColumnSchema::new("d", DataType::Float, false),
6983 ColumnSchema::new("e", DataType::Bool, false),
6984 ColumnSchema::new("f", DataType::Text, false),
6985 ColumnSchema::new(
6986 "g",
6987 DataType::Vector {
6988 dim: 3,
6989 encoding: VecEncoding::F32,
6990 },
6991 false,
6992 ),
6993 ColumnSchema::new(
6994 "h",
6995 DataType::Numeric {
6996 precision: 18,
6997 scale: 2,
6998 },
6999 false,
7000 ),
7001 ColumnSchema::new("i", DataType::Date, false),
7002 ColumnSchema::new("j", DataType::Timestamp, false),
7003 ],
7004 );
7005 let cases: &[Row] = &[
7006 Row::new(vec![
7007 Value::SmallInt(7),
7008 Value::Int(42),
7009 Value::BigInt(1_000_000),
7010 Value::Float(1.5),
7011 Value::Bool(true),
7012 Value::Text("hello".into()),
7013 Value::Vector(vec![1.0, 2.0, 3.0]),
7014 Value::Numeric {
7015 scaled: 12345,
7016 scale: 2,
7017 },
7018 Value::Date(20_000),
7019 Value::Timestamp(1_700_000_000_000_000),
7020 ]),
7021 Row::new(vec![
7023 Value::Null,
7024 Value::Int(0),
7025 Value::BigInt(0),
7026 Value::Float(0.0),
7027 Value::Bool(false),
7028 Value::Text(String::new()),
7029 Value::Vector(vec![]),
7030 Value::Numeric {
7031 scaled: 0,
7032 scale: 2,
7033 },
7034 Value::Date(0),
7035 Value::Timestamp(0),
7036 ]),
7037 Row::new(vec![
7038 Value::SmallInt(-1),
7039 Value::Int(-1),
7040 Value::BigInt(-1),
7041 Value::Float(-0.5),
7042 Value::Bool(true),
7043 Value::Text("a much longer payload here".into()),
7044 Value::Vector(vec![0.1, 0.2, 0.3]),
7045 Value::Numeric {
7046 scaled: -999_999_999,
7047 scale: 2,
7048 },
7049 Value::Date(-1),
7050 Value::Timestamp(-1),
7051 ]),
7052 ];
7053 for row in cases {
7054 let actual = encode_row_body_dense(row, &schema).len();
7055 let fast = row_body_encoded_len(row, &schema);
7056 assert_eq!(actual, fast, "row {row:?}");
7057 }
7058 }
7059
7060 #[test]
7061 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
7062 let mut cat = Catalog::new();
7063 cat.create_table(bigint_pk_users_schema()).unwrap();
7064 let t = cat.get_mut("users").unwrap();
7065 assert_eq!(t.hot_bytes(), 0);
7066 let mut expected: u64 = 0;
7067 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7068 let row = make_user_row(id, name);
7069 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
7070 t.insert(row).unwrap();
7071 }
7072 assert_eq!(t.hot_bytes(), expected);
7073 assert_eq!(cat.hot_tier_bytes(), expected);
7074 }
7075
7076 #[test]
7077 fn hot_bytes_shrinks_on_delete() {
7078 let mut cat = Catalog::new();
7079 cat.create_table(bigint_pk_users_schema()).unwrap();
7080 let t = cat.get_mut("users").unwrap();
7081 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7082 t.insert(make_user_row(id, name)).unwrap();
7083 }
7084 let before = t.hot_bytes();
7085 let bob_row = make_user_row(2, "bob");
7087 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
7088 let removed = t.delete_rows(&[1]);
7089 assert_eq!(removed, 1);
7090 assert_eq!(t.hot_bytes(), before - bob_bytes);
7091 }
7092
7093 #[test]
7094 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
7095 let mut cat = Catalog::new();
7096 cat.create_table(bigint_pk_users_schema()).unwrap();
7097 let t = cat.get_mut("users").unwrap();
7098 t.insert(make_user_row(1, "alice")).unwrap();
7099 let after_insert = t.hot_bytes();
7100 let new_row = make_user_row(1, "alice-the-longer-name");
7103 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
7104 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
7105 t.update_row(0, new_row.values).unwrap();
7106 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
7107 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
7108 }
7109
7110 #[test]
7111 fn hot_bytes_round_trips_through_serialize_deserialize() {
7112 let mut cat = Catalog::new();
7113 cat.create_table(bigint_pk_users_schema()).unwrap();
7114 let t = cat.get_mut("users").unwrap();
7115 for i in 0..10 {
7116 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
7117 .unwrap();
7118 }
7119 let pre = cat.hot_tier_bytes();
7120 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7121 assert_eq!(restored.hot_tier_bytes(), pre);
7122 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
7123 }
7124
7125 #[test]
7132 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
7133 let mut cat = Catalog::new();
7134 cat.create_table(bigint_pk_users_schema()).unwrap();
7135 let t = cat.get_mut("users").unwrap();
7136 for id in 0..10i64 {
7137 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7138 .unwrap();
7139 }
7140 t.add_index("by_id".into(), "id").unwrap();
7141 let total_bytes_before = t.hot_bytes();
7142
7143 let report = cat
7144 .freeze_oldest_to_cold("users", "by_id", 6)
7145 .expect("freeze succeeds");
7146 assert_eq!(report.frozen_rows, 6);
7147 assert_eq!(report.segment_id, 0);
7148 assert!(report.bytes_freed > 0);
7149 assert!(!report.segment_bytes.is_empty());
7150
7151 let t = cat.get("users").unwrap();
7152 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
7153 assert_eq!(cat.cold_segment_count(), 1);
7154 assert_eq!(
7156 t.hot_bytes(),
7157 total_bytes_before - report.bytes_freed,
7158 "hot_bytes accounting matches FreezeReport"
7159 );
7160
7161 for id in 0..10i64 {
7164 let got = cat
7165 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7166 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
7167 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7168 }
7169 }
7170
7171 #[test]
7176 fn freeze_twice_preserves_prior_cold_locators() {
7177 let mut cat = Catalog::new();
7178 cat.create_table(bigint_pk_users_schema()).unwrap();
7179 let t = cat.get_mut("users").unwrap();
7180 for id in 0..12i64 {
7181 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7182 .unwrap();
7183 }
7184 t.add_index("by_id".into(), "id").unwrap();
7185
7186 cat.freeze_oldest_to_cold("users", "by_id", 4)
7187 .expect("first freeze ok");
7188 cat.freeze_oldest_to_cold("users", "by_id", 4)
7189 .expect("second freeze ok");
7190
7191 assert_eq!(cat.get("users").unwrap().row_count(), 4);
7192 assert_eq!(cat.cold_segment_count(), 2);
7193 for id in 0..12i64 {
7196 let got = cat
7197 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7198 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
7199 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7200 }
7201 }
7202
7203 #[test]
7206 fn freeze_oldest_to_cold_rejects_invalid_input() {
7207 let mut cat = Catalog::new();
7208 cat.create_table(bigint_pk_users_schema()).unwrap();
7209 let t = cat.get_mut("users").unwrap();
7210 for id in 0..3i64 {
7211 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7212 .unwrap();
7213 }
7214 t.add_index("by_id".into(), "id").unwrap();
7215
7216 assert!(matches!(
7218 cat.freeze_oldest_to_cold("users", "by_id", 0),
7219 Err(StorageError::Corrupt(_))
7220 ));
7221 assert!(matches!(
7223 cat.freeze_oldest_to_cold("missing", "by_id", 1),
7224 Err(StorageError::Corrupt(_))
7225 ));
7226 assert!(matches!(
7228 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
7229 Err(StorageError::Corrupt(_))
7230 ));
7231 assert!(matches!(
7233 cat.freeze_oldest_to_cold("users", "by_id", 999),
7234 Err(StorageError::Corrupt(_))
7235 ));
7236 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7238 assert_eq!(cat.cold_segment_count(), 0);
7239 }
7240
7241 #[test]
7244 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
7245 let mut cat = Catalog::new();
7246 cat.create_table(TableSchema::new(
7247 "by_name",
7248 vec![
7249 ColumnSchema::new("name", DataType::Text, false),
7250 ColumnSchema::new("payload", DataType::BigInt, false),
7251 ],
7252 ))
7253 .unwrap();
7254 let t = cat.get_mut("by_name").unwrap();
7255 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
7256 .unwrap();
7257 t.add_index("by_n".into(), "name").unwrap();
7258 let err = cat
7259 .freeze_oldest_to_cold("by_name", "by_n", 1)
7260 .expect_err("non-integer PK rejected");
7261 match err {
7262 StorageError::Corrupt(s) => assert!(
7263 s.contains("non-integer"),
7264 "error message names the constraint: {s}"
7265 ),
7266 other => panic!("expected Corrupt, got {other:?}"),
7267 }
7268 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7270 assert_eq!(cat.cold_segment_count(), 0);
7271 }
7272
7273 #[test]
7278 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7279 let mut cat = Catalog::new();
7280 cat.create_table(bigint_pk_users_schema()).unwrap();
7281 let t = cat.get_mut("users").unwrap();
7282 for id in 0..6i64 {
7283 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7284 .unwrap();
7285 }
7286 t.add_index("by_id".into(), "id").unwrap();
7287 t.add_index("by_name".into(), "name").unwrap();
7288
7289 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7290
7291 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7295 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7296 assert_eq!(got.len(), 1);
7297 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7298 match got[0] {
7299 RowLocator::Hot(i) => {
7300 assert_eq!(i, 1);
7303 }
7304 RowLocator::Cold { .. } => unreachable!(),
7305 }
7306 }
7307
7308 #[test]
7316 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7317 let mut cat = Catalog::new();
7318 cat.create_table(bigint_pk_users_schema()).unwrap();
7319 let t = cat.get_mut("users").unwrap();
7320 for id in 0..6i64 {
7321 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7322 .unwrap();
7323 }
7324 t.add_index("by_id".into(), "id").unwrap();
7325 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7328 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7329
7330 let new_idx = cat
7332 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7333 .expect("promote ok")
7334 .expect("PK 2 was cold");
7335 assert_eq!(
7336 new_idx, 2,
7337 "promoted row appended after the 2 surviving hot rows"
7338 );
7339
7340 let t = cat.get("users").unwrap();
7341 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7342 let row = make_user_row(2, "u-2");
7344 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7345 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7346
7347 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7350 assert_eq!(entries.len(), 1, "exactly one locator per key");
7351 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7352 assert_eq!(
7354 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7355 .unwrap(),
7356 row
7357 );
7358 assert_eq!(
7361 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7362 .unwrap(),
7363 make_user_row(0, "u-0")
7364 );
7365 }
7366
7367 #[test]
7371 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7372 let mut cat = Catalog::new();
7373 cat.create_table(bigint_pk_users_schema()).unwrap();
7374 let t = cat.get_mut("users").unwrap();
7375 t.insert(make_user_row(7, "alice")).unwrap();
7376 t.add_index("by_id".into(), "id").unwrap();
7377
7378 assert!(
7380 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7381 .unwrap()
7382 .is_none()
7383 );
7384 assert!(
7386 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7387 .unwrap()
7388 .is_none()
7389 );
7390 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7392 assert_eq!(cat.cold_segment_count(), 0);
7393 }
7394
7395 #[test]
7400 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7401 let mut cat = Catalog::new();
7402 cat.create_table(bigint_pk_users_schema()).unwrap();
7403 let t = cat.get_mut("users").unwrap();
7404 for id in 0..5i64 {
7405 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7406 .unwrap();
7407 }
7408 t.add_index("by_id".into(), "id").unwrap();
7409 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7410
7411 assert!(
7413 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7414 .is_some(),
7415 "frozen PK resolves before shadow"
7416 );
7417 let removed = cat
7418 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7419 .unwrap();
7420 assert_eq!(removed, 1, "exactly one cold locator retired");
7421
7422 assert!(
7425 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7426 .is_none(),
7427 "shadowed key no longer resolves"
7428 );
7429 assert_eq!(
7431 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7432 .unwrap(),
7433 make_user_row(0, "u-0")
7434 );
7435 assert_eq!(
7436 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7437 .unwrap(),
7438 make_user_row(2, "u-2")
7439 );
7440 }
7441
7442 #[test]
7447 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7448 let mut cat = Catalog::new();
7449 cat.create_table(bigint_pk_users_schema()).unwrap();
7450 let t = cat.get_mut("users").unwrap();
7451 t.insert(make_user_row(1, "alice")).unwrap();
7452 t.add_index("by_id".into(), "id").unwrap();
7453 assert_eq!(
7454 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7455 .unwrap(),
7456 0,
7457 "hot-only key drops no cold locators"
7458 );
7459 assert_eq!(
7460 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7461 .unwrap(),
7462 0,
7463 "absent key drops no cold locators"
7464 );
7465 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7466 }
7467
7468 #[test]
7470 fn promote_and_shadow_reject_invalid_inputs() {
7471 let mut cat = Catalog::new();
7472 cat.create_table(bigint_pk_users_schema()).unwrap();
7473 let t = cat.get_mut("users").unwrap();
7474 t.insert(make_user_row(1, "alice")).unwrap();
7475 t.add_index("by_id".into(), "id").unwrap();
7476
7477 assert!(matches!(
7479 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7480 Err(StorageError::Corrupt(_))
7481 ));
7482 assert!(matches!(
7483 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7484 Err(StorageError::Corrupt(_))
7485 ));
7486 assert!(matches!(
7488 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7489 Err(StorageError::Corrupt(_))
7490 ));
7491 assert!(matches!(
7492 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7493 Err(StorageError::Corrupt(_))
7494 ));
7495 }
7496
7497 #[test]
7504 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7505 let mut a = Catalog::new();
7506 let mut b = Catalog::new();
7507 for cat in [&mut a, &mut b] {
7508 cat.create_table(bigint_pk_users_schema()).unwrap();
7509 let t = cat.get_mut("users").unwrap();
7510 for id in 0..10i64 {
7511 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7512 .unwrap();
7513 }
7514 t.add_index("by_id".into(), "id").unwrap();
7515 }
7516 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7517 let slice = b
7518 .prepare_freeze_slice("users", "by_id", 0..6)
7519 .expect("prepare");
7520 let parallel = b
7521 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7522 .expect("commit");
7523 assert_eq!(single.segment_id, parallel.segment_id);
7524 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7525 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7526 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7527 for id in 0..10i64 {
7529 assert_eq!(
7530 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7531 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7532 "PK {id} differs after single vs slice freeze"
7533 );
7534 }
7535 }
7536
7537 #[test]
7542 fn commit_freeze_slices_two_slices_match_single_slice() {
7543 let mut a = Catalog::new();
7544 let mut b = Catalog::new();
7545 for cat in [&mut a, &mut b] {
7546 cat.create_table(bigint_pk_users_schema()).unwrap();
7547 let t = cat.get_mut("users").unwrap();
7548 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7551 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7552 .unwrap();
7553 }
7554 t.add_index("by_id".into(), "id").unwrap();
7555 }
7556 let single = a
7557 .prepare_freeze_slice("users", "by_id", 0..8)
7558 .expect("prepare");
7559 let one = a
7560 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7561 .expect("commit one");
7562 let s1 = b
7563 .prepare_freeze_slice("users", "by_id", 0..4)
7564 .expect("prepare s1");
7565 let s2 = b
7566 .prepare_freeze_slice("users", "by_id", 4..8)
7567 .expect("prepare s2");
7568 let two = b
7569 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7570 .expect("commit two");
7571 assert_eq!(one.segment_bytes, two.segment_bytes);
7572 assert_eq!(one.frozen_rows, two.frozen_rows);
7573 for id in 0..10i64 {
7576 assert_eq!(
7577 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7578 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7579 "PK {id} differs after one-slice vs two-slice freeze"
7580 );
7581 }
7582 }
7583
7584 #[test]
7586 fn commit_freeze_slices_rejects_gap() {
7587 let mut cat = Catalog::new();
7588 cat.create_table(bigint_pk_users_schema()).unwrap();
7589 let t = cat.get_mut("users").unwrap();
7590 for id in 0..6i64 {
7591 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7592 .unwrap();
7593 }
7594 t.add_index("by_id".into(), "id").unwrap();
7595 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7596 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7597 assert!(matches!(
7598 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7599 Err(StorageError::Corrupt(_))
7600 ));
7601 assert_eq!(cat.cold_segment_count(), 0);
7603 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7604 }
7605
7606 #[test]
7608 fn commit_freeze_slices_empty_is_noop() {
7609 let mut cat = Catalog::new();
7610 cat.create_table(bigint_pk_users_schema()).unwrap();
7611 let t = cat.get_mut("users").unwrap();
7612 for id in 0..3i64 {
7613 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7614 .unwrap();
7615 }
7616 t.add_index("by_id".into(), "id").unwrap();
7617 let report = cat
7618 .commit_freeze_slices("users", "by_id", Vec::new())
7619 .unwrap();
7620 assert_eq!(report.frozen_rows, 0);
7621 assert_eq!(cat.cold_segment_count(), 0);
7622 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7623 }
7624
7625 #[test]
7632 fn compact_merges_small_segments_storage_unit() {
7633 let mut cat = Catalog::new();
7634 cat.create_table(bigint_pk_users_schema()).unwrap();
7635 let t = cat.get_mut("users").unwrap();
7636 for id in 0..8i64 {
7637 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7638 .unwrap();
7639 }
7640 t.add_index("by_id".into(), "id").unwrap();
7641 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7643 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7644 assert_eq!(cat.cold_segment_count(), 2);
7645 assert_eq!(cat.cold_segment_slot_count(), 2);
7646
7647 let max_seg_bytes = cat
7650 .cold_segment_ids_global()
7651 .iter()
7652 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7653 .max()
7654 .unwrap();
7655 let target = max_seg_bytes + 1;
7656
7657 let report = cat
7658 .compact_cold_segments("users", "by_id", target)
7659 .expect("compact succeeds");
7660 assert_eq!(report.sources.len(), 2);
7661 let merged_id = report.merged_segment_id.expect("merge happened");
7662 assert_eq!(report.merged_rows, 6);
7663 assert_eq!(report.deleted_rows_pruned, 0);
7664 assert!(!report.merged_segment_bytes.is_empty());
7665
7666 assert_eq!(cat.cold_segment_count(), 1);
7669 assert_eq!(cat.cold_segment_slot_count(), 3);
7670 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7671
7672 for id in 0..8i64 {
7675 let got = cat
7676 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7677 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7678 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7679 }
7680 }
7681
7682 #[test]
7686 fn compact_drops_shadowed_cold_rows() {
7687 let mut cat = Catalog::new();
7688 cat.create_table(bigint_pk_users_schema()).unwrap();
7689 let t = cat.get_mut("users").unwrap();
7690 for id in 0..6i64 {
7691 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7692 .unwrap();
7693 }
7694 t.add_index("by_id".into(), "id").unwrap();
7695 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7696 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7697 assert_eq!(
7699 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7700 .unwrap(),
7701 1
7702 );
7703 assert_eq!(
7704 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7705 .unwrap(),
7706 1
7707 );
7708
7709 let max_seg_bytes = cat
7710 .cold_segment_ids_global()
7711 .iter()
7712 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7713 .max()
7714 .unwrap();
7715 let report = cat
7716 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7717 .expect("compact succeeds");
7718 assert_eq!(report.sources.len(), 2);
7719 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7720 assert_eq!(report.deleted_rows_pruned, 2);
7721
7722 for shadowed in [1i64, 4i64] {
7724 assert!(
7725 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7726 .is_none(),
7727 "shadowed PK {shadowed} must remain invisible after compact"
7728 );
7729 }
7730 for live in [0i64, 2, 3, 5] {
7732 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7733 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7734 }
7735 }
7736
7737 #[test]
7740 fn compact_is_noop_below_two_candidates() {
7741 let mut cat = Catalog::new();
7742 cat.create_table(bigint_pk_users_schema()).unwrap();
7743 let t = cat.get_mut("users").unwrap();
7744 for id in 0..6i64 {
7745 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7746 .unwrap();
7747 }
7748 t.add_index("by_id".into(), "id").unwrap();
7749 let report = cat
7751 .compact_cold_segments("users", "by_id", 1 << 30)
7752 .expect("noop ok");
7753 assert!(report.merged_segment_id.is_none());
7754 assert!(report.sources.is_empty());
7755
7756 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7758 let report = cat
7759 .compact_cold_segments("users", "by_id", 1 << 30)
7760 .expect("noop ok");
7761 assert!(report.merged_segment_id.is_none());
7762 assert_eq!(cat.cold_segment_count(), 1);
7763
7764 let report = cat
7767 .compact_cold_segments("users", "by_id", 1)
7768 .expect("noop ok");
7769 assert!(report.merged_segment_id.is_none());
7770 assert_eq!(cat.cold_segment_count(), 1);
7771 }
7772
7773 #[test]
7781 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7782 let mut cat = Catalog::new();
7783 cat.create_table(bigint_pk_users_schema()).unwrap();
7784 let t = cat.get_mut("users").unwrap();
7785 for id in 0..6i64 {
7786 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7787 .unwrap();
7788 }
7789 t.add_index("by_id".into(), "id").unwrap();
7790 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7791 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7792 let max_seg_bytes = cat
7793 .cold_segment_ids_global()
7794 .iter()
7795 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7796 .max()
7797 .unwrap();
7798 let report = cat
7799 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7800 .expect("compact ok");
7801 let merged_id = report.merged_segment_id.unwrap();
7802
7803 let cat_bytes = cat.serialize();
7808 let merged_bytes = report.merged_segment_bytes.clone();
7809
7810 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7811 restored
7812 .load_segment_bytes_at(merged_id, merged_bytes)
7813 .expect("reload merged ok");
7814
7815 for id in 0..6i64 {
7817 let got = restored
7818 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7819 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7820 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7821 }
7822 assert_eq!(restored.cold_segment_count(), 1);
7825 }
7826
7827 #[test]
7830 fn load_segment_bytes_at_pads_and_rejects_collision() {
7831 let mut cat = Catalog::new();
7832 cat.create_table(bigint_pk_users_schema()).unwrap();
7833 let t = cat.get_mut("users").unwrap();
7834 for id in 0..4i64 {
7835 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7836 .unwrap();
7837 }
7838 t.add_index("by_id".into(), "id").unwrap();
7839 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7840 let bytes_seg0 = report.segment_bytes.clone();
7841
7842 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7846 .expect("pad + load ok");
7847 assert_eq!(cat.cold_segment_slot_count(), 6);
7848 assert_eq!(cat.cold_segment_count(), 2);
7849
7850 assert!(matches!(
7852 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7853 Err(StorageError::Corrupt(_))
7854 ));
7855 assert!(matches!(
7857 cat.load_segment_bytes_at(0, bytes_seg0),
7858 Err(StorageError::Corrupt(_))
7859 ));
7860 }
7861
7862 #[test]
7866 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7867 let mut cat = Catalog::new();
7868 cat.create_table(bigint_pk_users_schema()).unwrap();
7869 let t = cat.get_mut("users").unwrap();
7870 for id in 0..4i64 {
7871 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7872 .unwrap();
7873 }
7874 t.add_index("by_id".into(), "id").unwrap();
7875
7876 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7878 let promoted = cat
7879 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7880 .unwrap();
7881 assert!(promoted.is_some());
7882 let entries_after_promote = cat
7883 .get("users")
7884 .unwrap()
7885 .index_on(0)
7886 .unwrap()
7887 .lookup_eq(&IndexKey::Int(0))
7888 .to_vec();
7889 assert_eq!(entries_after_promote.len(), 1);
7890 assert!(entries_after_promote[0].is_hot());
7891
7892 for id in [2i64, 3] {
7899 assert_eq!(
7900 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7901 .unwrap(),
7902 make_user_row(id, &alloc::format!("u-{id}"))
7903 );
7904 }
7905 }
7906}